
在视觉主导的数字时代,图片数据已成为信息传递的核心媒介。据统计,全球每天产生的图片数据量超过35亿张,其中80%以上通过网络传播。高效爬取并管理这些视觉资产,不仅是技术挑战,更是数据战略的关键环节。
本文将构建一个完整的图片数据采集与管理系统,涵盖智能爬取、质量筛选、元数据提取到数据库存储的全流程解决方案。我们将超越基础的爬虫编写,深入探讨如何构建工业级图片数据管道。
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 目标网站分析 │───▶│ 分布式爬虫集群 │───▶│ 图片预处理层 │
│ - 反爬分析 │ │ - 异步并发 │ │ - 质量检测 │
│ - 结构解析 │ │ - 代理池 │ │ - 格式转换 │
└─────────────────┘ └─────────────────┘ └────────┬────────┘
│
┌─────────────────┐ ┌─────────────────┐ ┌────────▼────────┐
│ 数据应用层 │◀───│ 智能存储层 │◀───│ 特征提取层 │
│ - 检索系统 │ │ - 数据库 │ │ - 元数据提取 │
│ - 分析平台 │ │ - 文件系统 │ │ - 特征向量化 │
└─────────────────┘ └─────────────────┘ └─────────────────┘# requirements.txt 核心技术栈
requests==2.31.0 # HTTP请求库
aiohttp==3.8.5 # 异步HTTP客户端
BeautifulSoup4==4.12.2 # HTML解析
selenium==4.15.0 # 动态页面渲染
Pillow==10.1.0 # 图片处理
OpenCV-python==4.8.1.78 # 高级图片处理
pymongo==4.5.0 # MongoDB驱动
SQLAlchemy==2.0.23 # SQL数据库ORM
redis==5.0.1 # 缓存与队列
celery==5.3.4 # 分布式任务队列python
import asyncio
import aiohttp
from aiohttp import TCPConnector
from bs4 import BeautifulSoup
import hashlib
from urllib.parse import urljoin, urlparse
import logging
from typing import List, Set, Dict
import aioredis
class AsyncImageCrawler:
"""异步图片爬虫核心引擎"""
def __init__(self,
max_concurrency: int = 100,
request_timeout: int = 30,
use_proxy: bool = False):
self.max_concurrency = max_concurrency
self.request_timeout = aiohttp.ClientTimeout(total=request_timeout)
self.visited_urls: Set[str] = set()
self.image_urls: Dict[str, Dict] = {}
self.use_proxy = use_proxy
# 连接池配置
self.connector = TCPConnector(
limit=max_concurrency,
ssl=False,
force_close=True,
enable_cleanup_closed=True
)
# Redis连接(用于分布式去重)
self.redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)
# 异步信号量控制并发
self.semaphore = asyncio.Semaphore(max_concurrency)
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> str:
"""异步获取页面内容"""
async with self.semaphore:
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
async with session.get(url, headers=headers, timeout=self.request_timeout) as response:
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
if 'text/html' in content_type:
return await response.text()
else:
return ''
except Exception as e:
logging.error(f"Failed to fetch {url}: {e}")
return ''
def extract_image_urls(self, html: str, base_url: str) -> List[Dict]:
"""从HTML中提取图片URL及元数据"""
soup = BeautifulSoup(html, 'lxml')
images = []
for img in soup.find_all('img'):
img_url = img.get('src') or img.get('data-src') or img.get('data-original')
if not img_url:
continue
# 处理相对URL
img_url = urljoin(base_url, img_url)
# 过滤无效图片
if not self._is_valid_image_url(img_url):
continue
# 提取图片元数据
metadata = {
'url': img_url,
'alt': img.get('alt', ''),
'title': img.get('title', ''),
'width': img.get('width'),
'height': img.get('height'),
'class': img.get('class', []),
'page_url': base_url,
'found_time': datetime.now().isoformat()
}
images.append(metadata)
# CSS背景图片提取
for element in soup.find_all(style=True):
style = element['style']
if 'background-image' in style:
import re
bg_match = re.search(r'url\(["\']?(.*?)["\']?\)', style)
if bg_match:
bg_url = urljoin(base_url, bg_match.group(1))
if self._is_valid_image_url(bg_url):
images.append({
'url': bg_url,
'type': 'background',
'page_url': base_url,
'found_time': datetime.now().isoformat()
})
return images
def _is_valid_image_url(self, url: str) -> bool:
"""验证是否为有效图片URL"""
parsed = urlparse(url)
path = parsed.path.lower()
# 常见图片扩展名
valid_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp',
'.webp', '.svg', '.tiff', '.ico'}
# 检查扩展名或图片特征路径
if any(path.endswith(ext) for ext in valid_extensions):
return True
# 检查常见图片API路径模式
image_patterns = ['/image/', '/img/', '/photo/', '/picture/']
if any(pattern in path for pattern in image_patterns):
return True
return Falsepython
class AntiAntiCrawler:
"""反反爬虫策略管理器"""
def __init__(self):
self.proxy_pool = self._init_proxy_pool()
self.user_agents = self._load_user_agents()
self.request_patterns = []
def _init_proxy_pool(self):
"""初始化代理池"""
# 可从API获取或从文件加载代理
return [
'http://proxy1.example.com:8080',
'http://proxy2.example.com:8080',
# ...
]
def get_rotating_headers(self) -> Dict:
"""获取轮转请求头"""
import random
user_agent = random.choice(self.user_agents)
return {
'User-Agent': user_agent,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': random.choice(['zh-CN,zh;q=0.9', 'en-US,en;q=0.8', 'ja,en;q=0.7']),
'Referer': self._generate_random_referer(),
'Accept-Encoding': 'gzip, deflate, br',
'DNT': str(random.randint(0, 1)),
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': f'max-age={random.randint(0, 3600)}'
}
def _generate_random_referer(self) -> str:
"""生成随机Referer"""
domains = ['google.com', 'bing.com', 'baidu.com', 'github.com']
paths = ['/', '/search', '/images', '/news']
return f'https://{random.choice(domains)}{random.choice(paths)}'
async def intelligent_delay(self, domain: str):
"""基于请求历史的智能延迟"""
from datetime import datetime, timedelta
import asyncio
# 计算该域名的请求频率
recent_requests = self._get_domain_requests(domain)
if len(recent_requests) > 10:
# 高频访问,增加延迟
delay = random.uniform(3.0, 8.0)
elif len(recent_requests) > 5:
delay = random.uniform(1.5, 3.0)
else:
delay = random.uniform(0.5, 1.5)
await asyncio.sleep(delay)python
import aiofiles
from PIL import Image
import io
import magic # python-magic
import imghdr
class ImageProcessor:
"""图片处理器:下载、验证、基础处理"""
def __init__(self, storage_path: str = './images'):
self.storage_path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
async def download_image(self, session: aiohttp.ClientSession,
img_info: Dict) -> Dict:
"""异步下载图片并验证"""
img_url = img_info['url']
try:
# 生成唯一文件名
file_hash = hashlib.sha256(img_url.encode()).hexdigest()[:16]
file_extension = self._guess_extension(img_url)
filename = f"{file_hash}{file_extension}"
filepath = self.storage_path / filename
async with session.get(img_url, timeout=30) as response:
if response.status != 200:
return {**img_info, 'status': 'failed', 'reason': 'http_error'}
# 读取图片数据
img_data = await response.read()
# 验证是否为真实图片
if not self._is_valid_image_data(img_data):
return {**img_info, 'status': 'failed', 'reason': 'invalid_image'}
# 验证图片大小
if len(img_data) < 1024: # 小于1KB,可能是占位图
return {**img_info, 'status': 'failed', 'reason': 'too_small'}
if len(img_data) > 50 * 1024 * 1024: # 大于50MB
return {**img_info, 'status': 'failed', 'reason': 'too_large'}
# 保存图片
async with aiofiles.open(filepath, 'wb') as f:
await f.write(img_data)
# 提取图片元数据
metadata = await self._extract_image_metadata(filepath, img_data)
return {
**img_info,
**metadata,
'status': 'success',
'local_path': str(filepath),
'file_size': len(img_data),
'download_time': datetime.now().isoformat()
}
except Exception as e:
logging.error(f"Failed to download {img_url}: {e}")
return {**img_info, 'status': 'failed', 'reason': str(e)}
def _is_valid_image_data(self, data: bytes) -> bool:
"""验证字节数据是否为有效图片"""
# 方法1:使用imghdr
image_type = imghdr.what(None, data)
if image_type:
return True
# 方法2:使用magic
try:
import magic
mime = magic.from_buffer(data, mime=True)
return mime.startswith('image/')
except:
pass
# 方法3:PIL验证
try:
Image.open(io.BytesIO(data)).verify()
return True
except:
return False
async def _extract_image_metadata(self, filepath: Path, img_data: bytes) -> Dict:
"""提取图片技术元数据"""
try:
with Image.open(filepath) as img:
return {
'format': img.format,
'mode': img.mode,
'width': img.width,
'height': img.height,
'aspect_ratio': img.width / img.height if img.height > 0 else 0,
'has_alpha': 'A' in img.mode,
'is_animated': getattr(img, 'is_animated', False),
'frames': getattr(img, 'n_frames', 1)
}
except Exception as e:
logging.warning(f"Failed to extract metadata: {e}")
return {}python
class ImageQualityAssessor:
"""图片质量评估器"""
def assess_quality(self, image_path: Path, img_data: Dict) -> Dict:
"""综合评估图片质量"""
scores = {
'technical': self._technical_score(image_path),
'composition': self._composition_score(image_path),
'aesthetic': self._aesthetic_score(image_path),
'uniqueness': self._uniqueness_score(img_data)
}
total_score = sum(scores.values()) / len(scores)
# 质量分级
if total_score >= 0.8:
quality_grade = 'excellent'
elif total_score >= 0.6:
quality_grade = 'good'
elif total_score >= 0.4:
quality_grade = 'fair'
else:
quality_grade = 'poor'
return {
**scores,
'total_score': total_score,
'quality_grade': quality_grade,
'assessed_at': datetime.now().isoformat()
}
def _technical_score(self, image_path: Path) -> float:
"""技术质量评分"""
try:
with Image.open(image_path) as img:
score = 0.0
# 分辨率评分
megapixels = (img.width * img.height) / 1_000_000
if megapixels > 8:
score += 0.3
elif megapixels > 2:
score += 0.2
elif megapixels > 0.5:
score += 0.1
# 噪点检测(简化版)
# 实际应用可使用OpenCV进行更准确的检测
# 模糊检测
# 可使用拉普拉斯方差等方法
return min(1.0, score)
except:
return 0.0python
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.operations import UpdateOne
import gridfs
from bson import ObjectId
class ImageDatabase:
"""图片数据库管理器"""
def __init__(self,
mongo_uri: str = "mongodb://localhost:27017/",
db_name: str = "image_database"):
self.client = MongoClient(mongo_uri,
maxPoolSize=100,
connectTimeoutMS=5000,
socketTimeoutMS=30000)
self.db = self.client[db_name]
self.fs = gridfs.GridFS(self.db)
# 创建索引
self._create_indexes()
def _create_indexes(self):
"""创建优化查询的索引"""
# 图片元数据集合
self.db.images.create_index([('url', ASCENDING)], unique=True)
self.db.images.create_index([('domain', ASCENDING)])
self.db.images.create_index([('download_time', DESCENDING)])
self.db.images.create_index([('quality_score', DESCENDING)])
self.db.images.create_index([('tags', ASCENDING)])
self.db.images.create_index([('width', ASCENDING), ('height', ASCENDING)])
# 图片特征集合
self.db.features.create_index([('image_id', ASCENDING)], unique=True)
# 爬取任务集合
self.db.tasks.create_index([('status', ASCENDING)])
self.db.tasks.create_index([('priority', DESCENDING)])
async def save_image_metadata(self, image_data: Dict) -> str:
"""保存图片元数据"""
try:
# 准备文档
document = {
'url': image_data['url'],
'domain': urlparse(image_data['url']).netloc,
'metadata': {
'dimensions': {
'width': image_data.get('width'),
'height': image_data.get('height'),
'aspect_ratio': image_data.get('aspect_ratio')
},
'format': image_data.get('format'),
'file_size': image_data.get('file_size'),
'page_url': image_data.get('page_url'),
'alt_text': image_data.get('alt', ''),
'title': image_data.get('title', ''),
'found_time': image_data.get('found_time')
},
'storage': {
'local_path': image_data.get('local_path'),
'gridfs_id': None
},
'quality': image_data.get('quality_assessment', {}),
'processing': {
'downloaded': image_data.get('status') == 'success',
'download_time': image_data.get('download_time'),
'processed': False,
'feature_extracted': False
},
'tags': self._generate_auto_tags(image_data),
'created_at': datetime.now(),
'updated_at': datetime.now()
}
# 保存到GridFS(可选)
if 'local_path' in image_data and Path(image_data['local_path']).exists():
with open(image_data['local_path'], 'rb') as f:
file_id = self.fs.put(f,
filename=Path(image_data['local_path']).name,
metadata={'image_url': image_data['url']})
document['storage']['gridfs_id'] = str(file_id)
# 插入或更新
result = self.db.images.update_one(
{'url': image_data['url']},
{'$set': document, '$setOnInsert': {'created_at': datetime.now()}},
upsert=True
)
return str(result.upserted_id) if result.upserted_id else None
except Exception as e:
logging.error(f"Failed to save metadata: {e}")
return None
def _generate_auto_tags(self, image_data: Dict) -> List[str]:
"""自动生成标签"""
tags = []
# 基于URL路径的标签
url_path = urlparse(image_data['url']).path.lower()
path_parts = [p for p in url_path.split('/') if p and len(p) > 2]
tags.extend(path_parts)
# 基于ALT文本的标签
if image_data.get('alt'):
alt_tags = image_data['alt'].lower().split()
tags.extend([t for t in alt_tags if len(t) > 2])
# 基于尺寸的标签
width = image_data.get('width')
height = image_data.get('height')
if width and height:
if width > height:
tags.append('landscape')
elif height > width:
tags.append('portrait')
else:
tags.append('square')
if width >= 3840 or height >= 2160:
tags.append('4k')
elif width >= 1920 or height >= 1080:
tags.append('hd')
# 去重并返回
return list(set([t for t in tags if 2 < len(t) < 50]))python
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime,
Float, Boolean, ForeignKey, JSON, BigInteger
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.dialects.postgresql import ARRAY
Base = declarative_base()
class CrawlTask(Base):
"""爬取任务表"""
__tablename__ = 'crawl_tasks'
id = Column(Integer, primary_key=True)
domain = Column(String(255), nullable=False, index=True)
start_url = Column(Text, nullable=False)
status = Column(String(50), default='pending', index=True) # pending, running, completed, failed
priority = Column(Integer, default=1)
max_pages = Column(Integer, default=1000)
max_images = Column(Integer, default=5000)
settings = Column(JSON) # 爬取配置
created_at = Column(DateTime, default=datetime.now)
started_at = Column(DateTime)
completed_at = Column(DateTime)
error_message = Column(Text)
# 关系
images = relationship("ImageRecord", back_populates="task")
class ImageRecord(Base):
"""图片记录表"""
__tablename__ = 'image_records'
id = Column(BigInteger, primary_key=True)
task_id = Column(Integer, ForeignKey('crawl_tasks.id'), index=True)
url = Column(Text, unique=True, nullable=False, index=True)
domain = Column(String(255), index=True)
file_hash = Column(String(64), index=True) # 文件哈希值
local_path = Column(Text)
# 技术元数据
width = Column(Integer)
height = Column(Integer)
format = Column(String(20))
file_size = Column(BigInteger) # 字节数
aspect_ratio = Column(Float)
# 内容元数据
alt_text = Column(Text)
page_title = Column(Text)
page_url = Column(Text)
# 质量评估
quality_score = Column(Float, index=True)
quality_grade = Column(String(20))
# 状态标记
is_duplicate = Column(Boolean, default=False)
is_corrupted = Column(Boolean, default=False)
is_adult_content = Column(Boolean, default=False)
# 标签系统
auto_tags = Column(ARRAY(String)) # 自动生成的标签
manual_tags = Column(ARRAY(String)) # 手动添加的标签
# 时间戳
discovered_at = Column(DateTime)
downloaded_at = Column(DateTime)
processed_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.now)
# 关系
task = relationship("CrawlTask", back_populates="images")
class ImageFeature(Base):
"""图片特征表(用于相似性搜索)"""
__tablename__ = 'image_features'
id = Column(BigInteger, ForeignKey('image_records.id'), primary_key=True)
# 颜色特征
dominant_colors = Column(ARRAY(String)) # 主色调
color_histogram = Column(JSON) # 颜色直方图
# 纹理特征
texture_features = Column(JSON)
# 深度学习特征
embedding_vector = Column(ARRAY(Float)) # 特征向量
# 时间戳
extracted_at = Column(DateTime, default=datetime.now)
model_version = Column(String(50))
# 数据库初始化
engine = create_engine('postgresql://user:password@localhost/image_database')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)python
from celery import Celery
from celery.utils.log import get_task_logger
# Celery应用配置
celery_app = Celery(
'image_crawler',
broker='https://m.jixing.net/',
backend='https://m.jixing.net/shaoxiang/',
include=['crawler.tasks']
)
# 配置
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5分钟超时
task_soft_time_limit=240,
worker_max_tasks_per_child=1000,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)
logger = get_task_logger(__name__)
@celery_app.task(bind=True, max_retries=3)
def crawl_domain_task(self, domain: str, start_urls: List[str]):
"""分布式爬取任务"""
try:
crawler = AsyncImageCrawler(max_concurrency=50)
# 初始化数据库会话
db_session = Session()
# 创建任务记录
task_record = CrawlTask(
domain=domain,
start_url=start_urls[0],
status='running',
started_at=datetime.now()
)
db_session.add(task_record)
db_session.commit()
# 执行爬取
results = asyncio.run(crawler.crawl(start_urls))
# 处理结果
success_count = 0
for result in results:
if result.get('status') == 'success':
# 保存到数据库
image_record = ImageRecord(
task_id=task_record.id,
url=result['url'],
domain=domain,
local_path=result.get('local_path'),
width=result.get('width'),
height=result.get('height'),
format=result.get('format'),
file_size=result.get('file_size'),
alt_text=result.get('alt', '')[:500],
page_url=result.get('page_url'),
quality_score=result.get('quality_score', 0),
discovered_at=datetime.now(),
downloaded_at=datetime.now()
)
db_session.add(image_record)
success_count += 1
# 更新任务状态
task_record.status = 'completed'
task_record.completed_at = datetime.now()
db_session.commit()
return {
'task_id': task_record.id,
'domain': domain,
'total_images': len(results),
'successful': success_count,
'failed': len(results) - success_count
}
except Exception as exc:
logger.error(f"Task failed: {exc}")
# 更新任务状态为失败
if 'db_session' in locals() and 'task_record' in locals():
task_record.status = 'failed'
task_record.error_message = str(exc)
db_session.commit()
raise self.retry(exc=exc, countdown=60)
@celery_app.task
def process_image_features(image_id: int):
"""处理图片特征提取任务"""
from feature_extractor import ImageFeatureExtractor
db_session = Session()
try:
# 获取图片记录
image = db_session.query(ImageRecord).get(image_id)
if not image:
return {'status': 'error', 'message': 'Image not found'}
# 提取特征
extractor = ImageFeatureExtractor()
features = extractor.extract(image.local_path)
# 保存特征
image_feature = ImageFeature(
id=image.id,
dominant_colors=features['dominant_colors'],
color_histogram=features['color_histogram'],
texture_features=features['texture'],
embedding_vector=features['embedding'],
extracted_at=datetime.now(),
model_version='v1.0'
)
db_session.add(image_feature)
# 更新图片状态
image.processed_at = datetime.now()
db_session.commit()
return {'status': 'success', 'image_id': image_id}
except Exception as e:
logger.error(f"Feature extraction failed: {e}")
return {'status': 'error', 'message': str(e)}
finally:
db_session.close()python
import logging
from logging.handlers import RotatingFileHandler, SMTPHandler
import json
from datetime import datetime
class MonitoringSystem:
"""监控系统"""
def __init__(self):
self.setup_logging()
self.metrics = {
'crawling': {'requests': 0, 'success': 0, 'failed': 0},
'downloading': {'images': 0, 'success': 0, 'failed': 0},
'processing': {'images': 0, 'success': 0, 'failed': 0},
'database': {'inserts': 0, 'updates': 0, 'errors': 0}
}
def setup_logging(self):
"""配置结构化日志"""
logger = logging.getLogger('image_crawler')
logger.setLevel(logging.INFO)
# 文件处理器(JSON格式)
file_handler = RotatingFileHandler(
'logs/crawler.log',
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=10
)
# JSON格式化器
class JsonFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.now().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
if hasattr(record, 'extra'):
log_data.update(record.extra)
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data)
file_handler.setFormatter(JsonFormatter())
logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logger.addHandler(console_handler)
def record_metric(self, category: str, metric: str, value: int = 1):
"""记录指标"""
if category in self.metrics and metric in self.metrics[category]:
self.metrics[category][metric] += value
# 定期报告
if sum(self.metrics['crawling'].values()) % 100 == 0:
self.report_metrics()
def report_metrics(self):
"""报告当前指标"""
report = {
'timestamp': datetime.now().isoformat(),
'metrics': self.metrics,
'summary': {
'total_requests': self.metrics['crawling']['requests'],
'success_rate': (
self.metrics['crawling']['success'] /
max(self.metrics['crawling']['requests'], 1)
),
'total_images': self.metrics['downloading']['images'],
'image_success_rate': (
self.metrics['downloading']['success'] /
max(self.metrics['downloading']['images'], 1)
)
}
}
logging.info('Metrics report', extra=report)
# 重置计数器(保留累积值)
# 可以根据需要调整重置逻辑python
import imagehash
from PIL import Image
class DuplicateDetector:
"""重复图片检测器"""
def __init__(self, hash_size: int = 8):
self.hash_size = hash_size
def calculate_hashes(self, image_path: Path) -> Dict:
"""计算多种图片哈希"""
try:
with Image.open(image_path) as img:
# 转换为RGB模式
if img.mode not in ['RGB', 'L']:
img = img.convert('RGB')
# 调整大小用于哈希计算
img_hash = imagehash.average_hash(img, self.hash_size)
phash = imagehash.phash(img, self.hash_size)
dhash = imagehash.dhash(img, self.hash_size)
whash = imagehash.whash(img, self.hash_size)
# 颜色直方图
hist = img.histogram()
color_hash = hashlib.md5(str(hist).encode()).hexdigest()[:16]
return {
'average_hash': str(img_hash),
'perceptual_hash': str(phash),
'difference_hash': str(dhash),
'wavelet_hash': str(whash),
'color_hash': color_hash,
'file_size': image_path.stat().st_size,
'dimensions': f"{img.width}x{img.height}"
}
except Exception as e:
logging.error(f"Failed to calculate hashes: {e}")
return {}
def find_duplicates(self, image_hashes: List[Dict], threshold: int = 5) -> List[List[str]]:
"""查找重复图片"""
duplicates = []
processed = set()
for i, hash1 in enumerate(image_hashes):
if i in processed:
continue
group = [hash1['image_id']]
for j, hash2 in enumerate(image_hashes[i+1:], start=i+1):
if j in processed:
continue
# 计算哈希距离
similarity = self._hash_similarity(hash1, hash2)
if similarity >= threshold:
group.append(hash2['image_id'])
processed.add(j)
if len(group) > 1:
duplicates.append(group)
processed.add(i)
return duplicates
def _hash_similarity(self, hash1: Dict, hash2: Dict) -> int:
"""计算哈希相似度"""
# 比较多种哈希
scores = []
# 平均哈希比较
if 'average_hash' in hash1 and 'average_hash' in hash2:
h1 = imagehash.hex_to_hash(hash1['average_hash'])
h2 = imagehash.hex_to_hash(hash2['average_hash'])
scores.append(h1 - h2) # 距离越小越相似
# 文件大小比较
if 'file_size' in hash1 and 'file_size' in hash2:
size_diff = abs(hash1['file_size'] - hash2['file_size'])
size_score = size_diff / max(hash1['file_size'], hash2['file_size'])
scores.append(size_score * 64) # 转换为近似哈希距离
# 综合得分
return int(sum(scores) / len(scores)) if scores else 100python
class IncrementalCrawler:
"""增量爬取管理器"""
def __init__(self, domain: str):
self.domain = domain
self.db_session = Session()
def get_last_crawl_time(self) -> Optional[datetime]:
"""获取上次爬取时间"""
last_task = self.db_session.query(CrawlTask)\
.filter_by(domain=self.domain, status='completed')\
.order_by(CrawlTask.completed_at.desc())\
.first()
return last_task.completed_at if last_task else None
def check_for_updates(self, url: str) -> bool:
"""检查URL是否需要更新"""
# 检查robots.txt
if not self._check_robots_txt(url):
return False
# 检查页面最后修改时间
last_modified = self._get_last_modified(url)
if not last_modified:
return True # 无法确定,保守起见重新爬取
# 检查数据库中该页面的图片
page_images = self.db_session.query(ImageRecord)\
.filter_by(page_url=url)\
.all()
if not page_images:
return True # 新页面
# 如果页面修改时间晚于最近下载时间,需要更新
latest_download = max(img.downloaded_at for img in page_images)
return last_modified > latest_download
def _check_robots_txt(self, url: str) -> bool:
"""检查robots.txt"""
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
try:
response = requests.get(robots_url, timeout=10)
if response.status_code == 200:
from urllib.robotparser import RobotFileParser
rp = RobotFileParser()
rp.parse(response.text.splitlines())
return rp.can_fetch('*', url)
except:
pass
return True # 如果无法获取robots.txt,默认允许
def _get_last_modified(self, url: str) -> Optional[datetime]:
"""获取页面最后修改时间"""
try:
response = requests.head(url, timeout=10)
last_modified = response.headers.get('Last-Modified')
if last_modified:
from email.utils import parsedate_to_datetime
return parsedate_to_datetime(last_modified)
except:
pass
return Nonepython
class EcommerceImageCrawler(AsyncImageCrawler):
"""针对电商网站的专用爬虫"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.product_patterns = [
r'/product/',
r'/item/',
r'/goods/',
r'/p/\d+',
r'/detail/'
]
async def extract_product_images(self, html: str, base_url: str) -> List[Dict]:
"""提取商品图片(支持多图、缩略图)"""
soup = BeautifulSoup(html, 'lxml')
images = []
# 查找商品主图区域
product_selectors = [
'.product-image',
'.item-img',
'.goods-pic',
'[class*="gallery"]',
'[class*="slider"]'
]
for selector in product_selectors:
for container in soup.select(selector):
# 提取大图
large_images = container.find_all('img', {'src': True})
for img in large_images:
img_data = await self._extract_image_data(img, base_url, 'product_large')
if img_data:
images.append(img_data)
# 提取缩略图
thumbnails = container.find_all('img', {'data-src': True})
for thumb in thumbnails:
img_data = await self._extract_image_data(thumb, base_url, 'product_thumb')
if img_data:
images.append(img_data)
return images
async def crawl_product_catalog(self, category_url: str) -> List[Dict]:
"""爬取商品目录"""
all_products = []
async with aiohttp.ClientSession() as session:
page = 1
while True:
# 构造分页URL(根据网站结构调整)
paginated_url = f"{category_url}?page={page}"
html = await self.fetch_page(session, paginated_url)
if not html:
break
# 提取商品链接
product_links = self._extract_product_links(html)
if not product_links:
break
# 并发爬取商品详情页
tasks = []
for link in product_links:
task = self.crawl_product_page(session, link)
tasks.append(task)
product_results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集成功结果
for result in product_results:
if isinstance(result, dict):
all_products.append(result)
page += 1
# 延迟避免被封
await asyncio.sleep(random.uniform(1.0, 3.0))
return all_productspython
class PerformanceOptimizer:
"""性能优化器"""
@staticmethod
def optimize_database_queries(session):
"""优化数据库查询"""
# 1. 批量插入
def batch_insert(records, batch_size=1000):
for i in range(0, len(records), batch_size):
batch = records[i:i+batch_size]
session.bulk_insert_mappings(ImageRecord, batch)
session.commit()
# 2. 使用索引提示
# 3. 合理设置连接池
return session
@staticmethod
def optimize_memory_usage():
"""优化内存使用"""
import gc
import tracemalloc
# 启用内存跟踪
tracemalloc.start()
# 定期垃圾回收
gc.collect()
# 使用生成器处理大数据
def process_large_dataset(file_path):
with open(file_path, 'r') as f:
for line in f:
yield json.loads(line)
return {
'strategy': 'generator_based',
'gc_enabled': gc.isenabled(),
'memory_snapshot': tracemalloc.take_snapshot()
}
@staticmethod
def optimize_network_requests():
"""优化网络请求"""
# 连接复用
session = requests.Session()
# 请求合并
adapter = requests.adapters.HTTPAdapter(
pool_connections=100,
pool_maxsize=100,
max_retries=3
)
session.mount('http://', adapter)
session.mount('https://', adapter)
return sessionpython
class EthicalCrawler:
"""伦理爬取管理器"""
def __init__(self):
self.rules = self._load_crawling_rules()
def _load_crawling_rules(self) -> Dict:
"""加载爬取规则"""
return {
'respect_robots': True,
'delay_between_requests': 1.0, # 秒
'max_pages_per_domain': 1000,
'max_images_per_domain': 5000,
'prohibited_content': ['adult', 'violent', 'hate_speech'],
'copyright_respect': True
}
async def check_compliance(self, url: str) -> bool:
"""检查合规性"""
# 1. 检查robots.txt
if self.rules['respect_robots']:
if not await self._can_fetch(url):
return False
# 2. 检查内容限制
if await self._contains_prohibited_content(url):
return False
# 3. 检查版权声明
if self.rules['copyright_respect']:
if await self._has_copyright_restrictions(url):
return False
return True
async def _can_fetch(self, url: str) -> bool:
"""检查robots.txt"""
parsed = urlparse(url)
robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
try:
async with aiohttp.ClientSession() as session:
async with session.get(robots_url, timeout=10) as response:
if response.status == 200:
content = await response.text()
from urllib.robotparser import RobotFileParser
rp = RobotFileParser()
rp.parse(content.splitlines())
return rp.can_fetch('ImageCrawlerBot/1.0', url)
except:
pass
return Truedockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
libssl-dev \
libffi-dev \
libjpeg-dev \
zlib1g-dev \
libmagic1 \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 crawler && \
chown -R crawler:crawler /app
USER crawler
# 运行命令
CMD ["python", "main.py"]yaml
# kubernetes/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: image-crawler
spec:
replicas: 3
selector:
matchLabels:
app: image-crawler
template:
metadata:
labels:
app: image-crawler
spec:
containers:
- name: crawler
image: image-crawler:latest
env:
- name: REDIS_HOST
value: "redis-service"
- name: MONGODB_URI
value: "mongodb://mongodb-service:27017"
- name: POSTGRES_HOST
value: "postgres-service"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
volumeMounts:
- name: storage
mountPath: /app/images
volumes:
- name: storage
persistentVolumeClaim:
claimName: image-storage-pvc
---
# kubernetes/service.yaml
apiVersion: v1
kind: Service
metadata:
name: crawler-service
spec:
selector:
app: image-crawler
ports:
- port: 8000
targetPort: 8000本文介绍的系统不仅是一个技术实现,更是一个完整的图片数据管理解决方案。从智能爬取、质量管控到分布式存储,每个环节都经过精心设计,确保系统的性能、可靠性和可扩展性。
在视觉数据价值日益凸显的今天,一个优秀的图片数据管理系统不仅是技术工具,更是数字资产的核心基础设施。通过本文介绍的技术架构和实现方案,您将能够构建出既强大又灵活的图片数据解决方案,为各种应用场景提供高质量的视觉数据支持。
记住,技术实现只是手段,真正的价值在于如何利用这些数据创造有意义的应用。愿您的图片数据之旅充满发现与创新。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。