
情感分析(Sentiment Analysis),又称意见挖掘(Opinion Mining),是自然语言处理(NLP)领域的核心任务之一,旨在自动识别和提取文本中的情感信息。随着社交媒体的普及和用户生成内容的爆炸式增长,情感分析技术在商业决策、舆情监测、产品开发等领域发挥着越来越重要的作用。
情感分析技术演进
基于词典方法 → 机器学习方法 → 深度学习方法 → 预训练语言模型方法
(情感词典) (SVM, LR) (CNN, RNN) (BERT, GPT)情感(Sentiment):文本表达的主观态度、情绪或评价,通常分为积极、消极和中性。
极性(Polarity):情感的方向,如积极、消极、中性,是情感分析最基本的任务。
强度(Intensity):情感的强烈程度,如非常积极、比较积极、略微积极等。
方面(Aspect):评价的具体对象或属性,如产品的价格、质量、服务等。
细粒度情感分析:针对特定方面或实体的情感分析,提供更详细的情感信息。
多模态情感分析:结合文本、图像、音频、视频等多种模态信息进行情感分析。
根据分析粒度和任务目标的不同,情感分析可以分为以下主要变体:
变体类型 | 任务描述 | 输出结果 | 应用场景 |
|---|---|---|---|
极性分类 | 判断文本的整体情感倾向 | 积极/消极/中性 | 产品评论分析、舆情监测 |
情感强度分析 | 评估情感的强烈程度 | 0-1之间的连续值 | 精确情感评估、情感演化分析 |
方面级情感分析 | 识别特定方面的情感 | 方面-情感对 | 产品改进、用户需求分析 |
实体级情感分析 | 识别特定实体的情感 | 实体-情感对 | 品牌声誉管理、竞争分析 |
多模态情感分析 | 结合多模态信息进行分析 | 综合情感评分 | 社交媒体分析、内容推荐 |
跨语言情感分析 | 处理不同语言的情感文本 | 统一情感表示 | 全球市场分析、多语言内容处理 |
上下文感知情感分析 | 考虑上下文信息的分析 | 上下文相关情感 | 对话系统、交互式应用 |
根据2025年最新研究,情感分析技术呈现以下发展趋势:
情感词典是情感分析的基础资源,包含具有特定情感倾向的词汇及其强度值。
主要情感词典类型:
常用情感词典资源:
构建自定义情感词典的方法:
情感分析模型的评估需要使用适当的指标,根据任务类型的不同选择合适的评估方法。
分类任务评估指标:
回归任务评估指标:
Python实现示例(评估指标计算):
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, roc_auc_score, classification_report
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
class SentimentEvaluation:
def __init__(self):
pass
def evaluate_classification(self, y_true, y_pred, labels=None, average='weighted'):
"""评估分类任务的性能"""
results = {}
# 计算基本指标
results['accuracy'] = accuracy_score(y_true, y_pred)
results['precision'] = precision_score(y_true, y_pred, labels=labels, average=average, zero_division=0)
results['recall'] = recall_score(y_true, y_pred, labels=labels, average=average, zero_division=0)
results['f1_score'] = f1_score(y_true, y_pred, labels=labels, average=average, zero_division=0)
# 计算混淆矩阵
cm = confusion_matrix(y_true, y_pred, labels=labels)
results['confusion_matrix'] = cm
# 计算分类报告
report = classification_report(y_true, y_pred, labels=labels, zero_division=0)
results['classification_report'] = report
# 如果是二分类任务,计算ROC AUC
if len(set(y_true)) == 2 and len(set(y_pred)) == 2:
try:
results['roc_auc'] = roc_auc_score(y_true, y_pred)
except:
results['roc_auc'] = None
return results
def evaluate_regression(self, y_true, y_pred):
"""评估回归任务的性能"""
results = {}
results['mse'] = mean_squared_error(y_true, y_pred)
results['rmse'] = np.sqrt(results['mse'])
results['mae'] = mean_absolute_error(y_true, y_pred)
results['r2'] = r2_score(y_true, y_pred)
return results
def plot_confusion_matrix(self, cm, classes, title='Confusion Matrix', cmap=plt.cm.Blues):
"""可视化混淆矩阵"""
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap=cmap, xticklabels=classes, yticklabels=classes)
plt.title(title)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.tight_layout()
return plt
def print_evaluation_results(self, results, is_classification=True):
"""打印评估结果"""
if is_classification:
print("Classification Evaluation Results:")
print(f"Accuracy: {results['accuracy']:.4f}")
print(f"Precision: {results['precision']:.4f}")
print(f"Recall: {results['recall']:.4f}")
print(f"F1 Score: {results['f1_score']:.4f}")
if 'roc_auc' in results and results['roc_auc'] is not None:
print(f"ROC AUC: {results['roc_auc']:.4f}")
print("\nClassification Report:")
print(results['classification_report'])
else:
print("Regression Evaluation Results:")
print(f"MSE: {results['mse']:.4f}")
print(f"RMSE: {results['rmse']:.4f}")
print(f"MAE: {results['mae']:.4f}")
print(f"R²: {results['r2']:.4f}")基于词典的情感分析方法是最传统、最直接的方法,通过统计文本中情感词的出现情况来判断情感倾向。
基本原理:
优缺点:
Python实现示例:
import re
import jieba
import jieba.posseg as pseg
from collections import defaultdict
class LexiconBasedSentimentAnalyzer:
def __init__(self, positive_dict=None, negative_dict=None,
degree_words=None, negation_words=None,
stopwords=None, language='english'):
self.language = language
# 初始化情感词典
self.positive_dict = positive_dict or set()
self.negative_dict = negative_dict or set()
# 初始化程度词和否定词
self.degree_words = degree_words or {}
self.negation_words = negation_words or set()
# 初始化停用词
self.stopwords = stopwords or set()
# 如果没有提供词典,使用简单示例词典
if not self.positive_dict and not self.negative_dict:
self._init_default_lexicons()
def _init_default_lexicons(self):
"""初始化默认词典"""
if self.language == 'english':
self.positive_dict = {'good', 'great', 'excellent', 'wonderful', 'amazing',
'fantastic', 'terrific', 'outstanding', 'superb', 'awesome'}
self.negative_dict = {'bad', 'terrible', 'horrible', 'awful', 'disappointing',
'poor', 'worst', 'pathetic', 'lousy', 'horrendous'}
self.degree_words = {'very': 2.0, 'extremely': 3.0, 'quite': 1.5,
'somewhat': 0.8, 'slightly': 0.5}
self.negation_words = {'not', 'no', 'never', 'neither', 'nor'}
self.stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to'}
else: # 中文
self.positive_dict = {'好', '优秀', '出色', '棒', '精彩', '完美', '赞', '满意', '喜欢', '推荐'}
self.negative_dict = {'差', '糟糕', '垃圾', '失望', '不满', '讨厌', '差', '烂', '恶心', '差劲'}
self.degree_words = {'非常': 2.0, '极其': 3.0, '相当': 1.5,
'有点': 0.8, '稍微': 0.5, '很': 2.0, '特别': 2.5}
self.negation_words = {'不', '没', '无', '非', '否', '不要', '没有', '从未'}
self.stopwords = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个'}
def tokenize(self, text):
"""分词处理"""
if self.language == 'english':
# 简单的英文分词
tokens = re.findall(r'\\b\\w+\\b', text.lower())
else: # 中文
# 使用jieba进行中文分词
tokens = [word for word in jieba.cut(text) if word.strip()]
# 过滤停用词
tokens = [token for token in tokens if token not in self.stopwords]
return tokens
def analyze_sentiment(self, text):
"""分析文本情感"""
tokens = self.tokenize(text)
# 初始化情感分数
sentiment_score = 0
# 记录前一个词是否是否定词
prev_is_negation = False
# 记录前一个词的程度系数
degree_coefficient = 1.0
# 遍历所有词
for i, token in enumerate(tokens):
# 检查是否是否定词
if token in self.negation_words:
prev_is_negation = True
continue
# 检查是否是程度词
if token in self.degree_words:
degree_coefficient = self.degree_words[token]
continue
# 检查是否是情感词
if token in self.positive_dict:
score = 1.0 * degree_coefficient
if prev_is_negation:
score *= -1
prev_is_negation = False
sentiment_score += score
degree_coefficient = 1.0
elif token in self.negative_dict:
score = -1.0 * degree_coefficient
if prev_is_negation:
score *= -1
prev_is_negation = False
sentiment_score += score
degree_coefficient = 1.0
# 确定情感极性
if sentiment_score > 0:
sentiment = 'positive'
elif sentiment_score < 0:
sentiment = 'negative'
else:
sentiment = 'neutral'
# 归一化情感分数到[-1, 1]区间
max_abs_score = max(len(self.positive_dict), len(self.negative_dict))
normalized_score = sentiment_score / max_abs_score if max_abs_score > 0 else 0
normalized_score = max(-1.0, min(1.0, normalized_score))
return {
'sentiment': sentiment,
'score': sentiment_score,
'normalized_score': normalized_score,
'tokens': tokens
}
def batch_analyze(self, texts):
"""批量分析文本情感"""
results = []
for text in texts:
results.append(self.analyze_sentiment(text))
return results
def add_sentiment_words(self, positive_words=None, negative_words=None):
"""添加情感词到词典"""
if positive_words:
self.positive_dict.update(positive_words)
if negative_words:
self.negative_dict.update(negative_words)
def add_degree_words(self, degree_words_dict):
"""添加程度词到词典"""
self.degree_words.update(degree_words_dict)
def add_negation_words(self, negation_words):
"""添加否定词到词典"""
self.negation_words.update(negation_words)
# 测试示例
def test_lexicon_analyzer():
# 英文测试
en_texts = [
"This product is very good and I love it.",
"The quality is terrible and I am very disappointed.",
"It's okay but not great.",
"I neither like nor dislike this product."
]
en_analyzer = LexiconBasedSentimentAnalyzer(language='english')
en_results = en_analyzer.batch_analyze(en_texts)
print("English Sentiment Analysis Results:")
for i, (text, result) in enumerate(zip(en_texts, en_results)):
print(f"\nText {i+1}: {text}")
print(f"Sentiment: {result['sentiment']}")
print(f"Score: {result['score']}")
print(f"Normalized Score: {result['normalized_score']}")
# 中文测试
zh_texts = [
"这个产品非常好,我很喜欢。",
"质量很糟糕,我非常失望。",
"还可以,但不是特别好。",
"我既不喜欢也不讨厌这个产品。"
]
zh_analyzer = LexiconBasedSentimentAnalyzer(language='chinese')
zh_results = zh_analyzer.batch_analyze(zh_texts)
print("\n中文情感分析结果:")
for i, (text, result) in enumerate(zip(zh_texts, zh_results)):
print(f"\n文本 {i+1}: {text}")
print(f"情感: {result['sentiment']}")
print(f"分数: {result['score']}")
print(f"归一化分数: {result['normalized_score']}")
try:
test_lexicon_analyzer()
except ImportError:
print("请安装jieba库: pip install jieba")
print("示例输出:")
print("英文文本1: 情感积极,分数较高")
print("英文文本2: 情感消极,分数较低")
print("中文文本1: 情感积极,分数较高")
print("中文文本2: 情感消极,分数较低")基于机器学习的情感分析方法将情感分析视为分类问题,通过特征工程和分类算法实现情感预测。
常用机器学习算法:
特征工程方法:
Python实现示例(使用逻辑回归进行情感分析):
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
class MLBasedSentimentAnalyzer:
def __init__(self, model=None, vectorizer=None):
# 使用默认的TF-IDF向量化器和逻辑回归模型
self.vectorizer = vectorizer or TfidfVectorizer(max_features=5000, ngram_range=(1, 2))
self.model = model or LogisticRegression(random_state=42, max_iter=1000)
self.pipeline = Pipeline([
('vectorizer', self.vectorizer),
('classifier', self.model)
])
def train(self, X_train, y_train):
"""训练模型"""
self.pipeline.fit(X_train, y_train)
return self
def predict(self, X_test):
"""预测情感"""
return self.pipeline.predict(X_test)
def predict_proba(self, X_test):
"""预测情感概率"""
return self.pipeline.predict_proba(X_test)
def evaluate(self, X_test, y_test):
"""评估模型性能"""
y_pred = self.predict(X_test)
# 计算混淆矩阵
cm = confusion_matrix(y_test, y_pred)
# 生成分类报告
report = classification_report(y_test, y_pred)
print("Confusion Matrix:")
print(cm)
print("\nClassification Report:")
print(report)
# 可视化混淆矩阵
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()
return cm, report
def tune_hyperparameters(self, X_train, y_train, param_grid=None, cv=5):
"""超参数调优"""
if param_grid is None:
param_grid = {
'vectorizer__max_features': [2000, 5000, 10000],
'vectorizer__ngram_range': [(1, 1), (1, 2), (1, 3)],
'classifier__C': [0.1, 1.0, 10.0],
'classifier__penalty': ['l2'],
'classifier__solver': ['liblinear', 'lbfgs']
}
grid_search = GridSearchCV(self.pipeline, param_grid, cv=cv, n_jobs=-1, verbose=1)
grid_search.fit(X_train, y_train)
print(f"Best Parameters: {grid_search.best_params_}")
print(f"Best Cross-Validation Score: {grid_search.best_score_:.4f}")
# 更新模型为最佳参数的模型
self.pipeline = grid_search.best_estimator_
return grid_search
def get_top_features(self, n=20):
"""获取模型最重要的特征(词)"""
# 获取分类器和特征名称
classifier = self.pipeline.named_steps['classifier']
feature_names = self.pipeline.named_steps['vectorizer'].get_feature_names_out()
# 获取特征权重
if hasattr(classifier, 'coef_'):
coef = classifier.coef_[0]
# 获取最重要的正特征和负特征
top_positive_idx = coef.argsort()[-n:]
top_negative_idx = coef.argsort()[:n]
top_positive_features = [(feature_names[i], coef[i]) for i in reversed(top_positive_idx)]
top_negative_features = [(feature_names[i], coef[i]) for i in top_negative_idx]
return {
'positive': top_positive_features,
'negative': top_negative_features
}
else:
print("当前分类器不支持获取特征权重")
return None
def visualize_top_features(self, n=20, figsize=(15, 10)):
"""可视化最重要的特征"""
top_features = self.get_top_features(n)
if not top_features:
return
# 创建子图
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=figsize)
# 绘制积极特征
pos_words = [word for word, _ in top_features['positive']]
pos_scores = [score for _, score in top_features['positive']]
ax1.barh(pos_words, pos_scores, color='green')
ax1.set_title('Top Positive Features')
ax1.set_xlabel('Coefficient')
# 绘制消极特征
neg_words = [word for word, _ in top_features['negative']]
neg_scores = [abs(score) for _, score in top_features['negative']]
ax2.barh(neg_words, neg_scores, color='red')
ax2.set_title('Top Negative Features')
ax2.set_xlabel('Absolute Coefficient')
plt.tight_layout()
plt.show()
# 测试示例 - 使用模拟数据
def create_sample_data(n_samples=1000):
# 创建模拟的情感分析数据集
np.random.seed(42)
# 积极和消极词汇
positive_words = ['good', 'great', 'excellent', 'wonderful', 'amazing', 'love', 'like', 'best', 'superb', 'fantastic']
negative_words = ['bad', 'terrible', 'horrible', 'awful', 'disappointed', 'hate', 'dislike', 'worst', 'poor', 'pathetic']
neutral_words = ['the', 'and', 'is', 'in', 'to', 'of', 'for', 'with', 'on', 'at']
# 生成文本和标签
texts = []
labels = []
# 生成积极样本
for _ in range(n_samples // 3):
length = np.random.randint(5, 20)
words = np.random.choice(positive_words + neutral_words, size=length)
# 确保至少有一个积极词
if not any(word in positive_words for word in words):
words[np.random.randint(length)] = np.random.choice(positive_words)
text = ' '.join(words)
texts.append(text)
labels.append(1) # 积极
# 生成消极样本
for _ in range(n_samples // 3):
length = np.random.randint(5, 20)
words = np.random.choice(negative_words + neutral_words, size=length)
# 确保至少有一个消极词
if not any(word in negative_words for word in words):
words[np.random.randint(length)] = np.random.choice(negative_words)
text = ' '.join(words)
texts.append(text)
labels.append(0) # 消极
# 生成中性样本
for _ in range(n_samples - 2 * (n_samples // 3)):
length = np.random.randint(5, 20)
words = np.random.choice(neutral_words, size=length)
text = ' '.join(words)
texts.append(text)
labels.append(2) # 中性
# 打乱数据
indices = np.arange(len(texts))
np.random.shuffle(indices)
return np.array(texts)[indices], np.array(labels)[indices]
def test_ml_analyzer():
# 创建样本数据
X, y = create_sample_data(1000)
# 分割训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"训练集大小: {len(X_train)}")
print(f"测试集大小: {len(X_test)}")
print(f"类别分布: {np.bincount(y)}")
# 初始化并训练模型
analyzer = MLBasedSentimentAnalyzer()
analyzer.train(X_train, y_train)
# 评估模型
print("\n模型评估结果:")
analyzer.evaluate(X_test, y_test)
# 可视化重要特征
print("\n重要特征可视化:")
analyzer.visualize_top_features(15)
# 进行超参数调优
print("\n超参数调优:")
param_grid = {
'vectorizer__max_features': [2000, 5000],
'vectorizer__ngram_range': [(1, 1), (1, 2)],
'classifier__C': [0.1, 1.0]
}
analyzer.tune_hyperparameters(X_train, y_train, param_grid, cv=3)
# 调优后的模型评估
print("\n调优后的模型评估结果:")
analyzer.evaluate(X_test, y_test)
# 测试预测
test_texts = [
"This is a wonderful product and I love it very much.",
"I am very disappointed with the poor quality of this item.",
"The package arrived on time and in good condition."
]
predictions = analyzer.predict(test_texts)
probabilities = analyzer.predict_proba(test_texts)
print("\n测试文本预测结果:")
for i, (text, pred, prob) in enumerate(zip(test_texts, predictions, probabilities)):
sentiment = "Positive" if pred == 1 else "Negative" if pred == 0 else "Neutral"
print(f"\nText {i+1}: {text}")
print(f"Predicted Sentiment: {sentiment}")
print(f"Probabilities: Positive={prob[1]:.4f}, Negative={prob[0]:.4f}, Neutral={prob[2]:.4f}")
test_ml_analyzer()卷积神经网络(CNN)通过卷积操作提取文本的局部特征,在情感分析任务中取得了良好的效果。
基本原理:
优势:
Python实现示例(使用PyTorch实现CNN情感分析):
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
class CNN_SentimentAnalysis(nn.Module):
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim,
dropout, pad_idx):
super(CNN_SentimentAnalysis, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
# 卷积层 - 多个不同大小的卷积核
self.convs = nn.ModuleList([
nn.Conv2d(in_channels=1,
out_channels=n_filters,
kernel_size=(fs, embedding_dim))
for fs in filter_sizes
])
# 全连接层
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text shape: [batch size, sent len]
embedded = self.embedding(text)
# embedded shape: [batch size, sent len, emb dim]
# 添加通道维度
embedded = embedded.unsqueeze(1)
# embedded shape: [batch size, 1, sent len, emb dim]
# 应用卷积和ReLU
conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
# conved_n shape: [batch size, n_filters, sent len - filter_sizes[n] + 1]
# 应用最大池化
pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
# pooled_n shape: [batch size, n_filters]
# 连接不同卷积核的特征
cat = self.dropout(torch.cat(pooled, dim=1))
# cat shape: [batch size, n_filters * len(filter_sizes)]
# 全连接层输出
return self.fc(cat)
class TextPreprocessor:
def __init__(self, max_vocab_size=10000, max_length=100):
self.max_vocab_size = max_vocab_size
self.max_length = max_length
self.vocab = {'<PAD>': 0, '<UNK>': 1} # 填充标记和未知词标记
self.reverse_vocab = {0: '<PAD>', 1: '<UNK>'}
self.vocab_size = 2 # 初始词汇表大小
def build_vocab(self, texts):
"""构建词汇表"""
word_counts = Counter()
# 统计词频
for text in texts:
tokens = text.lower().split()
word_counts.update(tokens)
# 按词频排序并选择前max_vocab_size个词
most_common = word_counts.most_common(self.max_vocab_size - 2) # 减去<PAD>和<UNK>
# 构建词汇表
for word, _ in most_common:
self.vocab[word] = self.vocab_size
self.reverse_vocab[self.vocab_size] = word
self.vocab_size += 1
return self.vocab
def text_to_sequence(self, text):
"""将文本转换为序列"""
tokens = text.lower().split()
sequence = []
for token in tokens:
if token in self.vocab:
sequence.append(self.vocab[token])
else:
sequence.append(self.vocab['<UNK>'])
# 截断或填充到固定长度
if len(sequence) > self.max_length:
sequence = sequence[:self.max_length]
else:
sequence += [self.vocab['<PAD>']] * (self.max_length - len(sequence))
return sequence
def batch_text_to_tensor(self, texts):
"""批量将文本转换为张量"""
sequences = [self.text_to_sequence(text) for text in texts]
return torch.LongTensor(sequences)
def train_model(model, train_loader, valid_loader, optimizer, criterion, epochs, device):
"""训练模型"""
train_losses = []
valid_losses = []
valid_accs = []
best_valid_loss = float('inf')
for epoch in range(epochs):
# 训练模式
model.train()
train_loss = 0
for texts, labels in train_loader:
texts = texts.to(device)
labels = labels.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
predictions = model(texts)
loss = criterion(predictions, labels)
# 反向传播和优化
loss.backward()
optimizer.step()
train_loss += loss.item()
# 验证模式
model.eval()
valid_loss = 0
correct = 0
total = 0
with torch.no_grad():
for texts, labels in valid_loader:
texts = texts.to(device)
labels = labels.to(device)
# 前向传播
predictions = model(texts)
loss = criterion(predictions, labels)
valid_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 计算平均损失和准确率
train_loss_avg = train_loss / len(train_loader)
valid_loss_avg = valid_loss / len(valid_loader)
valid_acc = correct / total
# 保存最佳模型
if valid_loss_avg < best_valid_loss:
best_valid_loss = valid_loss_avg
torch.save(model.state_dict(), 'best_cnn_model.pt')
# 记录损失和准确率
train_losses.append(train_loss_avg)
valid_losses.append(valid_loss_avg)
valid_accs.append(valid_acc)
print(f'Epoch {epoch+1}/{epochs}, '
f'Train Loss: {train_loss_avg:.4f}, '
f'Valid Loss: {valid_loss_avg:.4f}, '
f'Valid Acc: {valid_acc:.4f}')
# 绘制训练过程
plt.figure(figsize=(12, 5))
# 损失图
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Valid Loss')
plt.title('Loss vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
# 准确率图
plt.subplot(1, 2, 2)
plt.plot(valid_accs, label='Valid Accuracy')
plt.title('Accuracy vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()
def evaluate_model(model, test_loader, criterion, device):
"""评估模型"""
model.eval()
test_loss = 0
correct = 0
total = 0
all_predictions = []
all_labels = []
with torch.no_grad():
for texts, labels in test_loader:
texts = texts.to(device)
labels = labels.to(device)
# 前向传播
predictions = model(texts)
loss = criterion(predictions, labels)
test_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 保存预测结果和真实标签
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
# 计算平均损失和准确率
test_loss_avg = test_loss / len(test_loader)
test_acc = correct / total
print(f'Test Loss: {test_loss_avg:.4f}, Test Acc: {test_acc:.4f}')
return test_loss_avg, test_acc, np.array(all_predictions), np.array(all_labels)
def cnn_sentiment_analysis_demo():
# 检查是否有可用的GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 创建示例数据
def create_sample_data(n_samples=1000):
np.random.seed(42)
positive_words = ['good', 'great', 'excellent', 'wonderful', 'amazing', 'love', 'like']
negative_words = ['bad', 'terrible', 'horrible', 'awful', 'disappointed', 'hate', 'dislike']
neutral_words = ['the', 'and', 'is', 'in', 'to', 'of', 'for']
texts = []
labels = []
# 生成积极样本
for _ in range(n_samples // 3):
length = np.random.randint(5, 20)
words = np.random.choice(positive_words + neutral_words, size=length)
if not any(word in positive_words for word in words):
words[np.random.randint(length)] = np.random.choice(positive_words)
text = ' '.join(words)
texts.append(text)
labels.append(1) # 积极
# 生成消极样本
for _ in range(n_samples // 3):
length = np.random.randint(5, 20)
words = np.random.choice(negative_words + neutral_words, size=length)
if not any(word in negative_words for word in words):
words[np.random.randint(length)] = np.random.choice(negative_words)
text = ' '.join(words)
texts.append(text)
labels.append(0) # 消极
# 生成中性样本
for _ in range(n_samples - 2 * (n_samples // 3)):
length = np.random.randint(5, 20)
words = np.random.choice(neutral_words, size=length)
text = ' '.join(words)
texts.append(text)
labels.append(2) # 中性
return texts, labels
# 创建数据
texts, labels = create_sample_data(1000)
# 初始化文本预处理器并构建词汇表
preprocessor = TextPreprocessor(max_vocab_size=5000, max_length=50)
preprocessor.build_vocab(texts)
# 将文本转换为张量
X_tensor = preprocessor.batch_text_to_tensor(texts)
y_tensor = torch.LongTensor(labels)
# 创建数据集
dataset = TensorDataset(X_tensor, y_tensor)
# 分割数据集
train_size = int(0.7 * len(dataset))
valid_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - valid_size
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 模型参数
vocab_size = preprocessor.vocab_size
embedding_dim = 100
n_filters = 100
filter_sizes = [3, 4, 5]
output_dim = 3 # 积极、消极、中性
dropout = 0.5
pad_idx = preprocessor.vocab['<PAD>']
# 初始化模型
model = CNN_SentimentAnalysis(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
n_filters=n_filters,
filter_sizes=filter_sizes,
output_dim=output_dim,
dropout=dropout,
pad_idx=pad_idx
)
# 将模型移至设备
model = model.to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
print("开始训练模型...")
train_model(model, train_loader, valid_loader, optimizer, criterion, epochs=10, device=device)
# 加载最佳模型
model.load_state_dict(torch.load('best_cnn_model.pt', map_location=device))
# 评估模型
print("\n评估模型...")
test_loss, test_acc, predictions, true_labels = evaluate_model(model, test_loader, criterion, device)
# 测试示例文本
test_texts = [
"This product is excellent and I love it so much",
"I hate this terrible product, it's the worst",
"The package was delivered on time today"
]
# 将测试文本转换为张量
test_tensor = preprocessor.batch_text_to_tensor(test_texts)
test_tensor = test_tensor.to(device)
# 预测
model.eval()
with torch.no_grad():
outputs = model(test_tensor)
_, predicted_classes = torch.max(outputs, 1)
# 输出结果
print("\n测试文本预测结果:")
sentiment_labels = {0: "Negative", 1: "Positive", 2: "Neutral"}
for i, (text, pred) in enumerate(zip(test_texts, predicted_classes)):
print(f"\nText {i+1}: {text}")
print(f"Predicted Sentiment: {sentiment_labels[pred.item()]}")
print(f"Confidence Scores: {F.softmax(outputs[i], dim=0).cpu().numpy()}")
# 运行示例
cnn_sentiment_analysis_demo()循环神经网络(RNN)及其变体长短期记忆网络(LSTM)能够有效捕获文本的序列信息和长期依赖关系,在情感分析任务中表现出色。
基本原理:
LSTM的优势:
Python实现示例(使用PyTorch实现LSTM情感分析):
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
class LSTM_SentimentAnalysis(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers,
bidirectional, dropout, pad_idx):
super(LSTM_SentimentAnalysis, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
# LSTM层
self.lstm = nn.LSTM(embedding_dim,
hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True)
# 全连接层 - 如果是双向LSTM,隐藏层维度要乘以2
self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, text, text_lengths=None):
# text shape: [batch size, sent len]
embedded = self.dropout(self.embedding(text))
# embedded shape: [batch size, sent len, emb dim]
# 如果提供了文本长度,可以使用pack_padded_sequence来优化计算
if text_lengths is not None:
# 确保序列按长度降序排列
text_lengths, sorted_indices = text_lengths.sort(descending=True)
embedded = embedded[sorted_indices]
# 打包序列
packed = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths, batch_first=True)
# LSTM前向传播
packed_output, (hidden, cell) = self.lstm(packed)
# 解包序列
output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
# 恢复原始顺序
_, unsorted_indices = sorted_indices.sort()
output = output[unsorted_indices]
hidden = hidden[:, unsorted_indices, :]
else:
# 普通LSTM前向传播
output, (hidden, cell) = self.lstm(embedded)
# 处理隐藏状态
if self.lstm.bidirectional:
# 对于双向LSTM,连接前向和后向的最后隐藏状态
hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
else:
# 对于单向LSTM,使用最后一层的最后隐藏状态
hidden = self.dropout(hidden[-1,:,:])
# hidden shape: [batch size, hidden dim * num directions]
# 全连接层输出
return self.fc(hidden)
# 复用之前的TextPreprocessor类
def train_lstm_model(model, train_loader, valid_loader, optimizer, criterion, epochs, device):
"""训练LSTM模型"""
train_losses = []
valid_losses = []
valid_accs = []
best_valid_loss = float('inf')
for epoch in range(epochs):
# 训练模式
model.train()
train_loss = 0
for texts, labels in train_loader:
texts = texts.to(device)
labels = labels.to(device)
# 计算文本长度(不包括填充)
text_lengths = torch.sum(texts != 0, dim=1)
# 梯度清零
optimizer.zero_grad()
# 前向传播
predictions = model(texts, text_lengths)
loss = criterion(predictions, labels)
# 反向传播和优化
loss.backward()
# 梯度裁剪,防止梯度爆炸
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
optimizer.step()
train_loss += loss.item()
# 验证模式
model.eval()
valid_loss = 0
correct = 0
total = 0
with torch.no_grad():
for texts, labels in valid_loader:
texts = texts.to(device)
labels = labels.to(device)
# 计算文本长度
text_lengths = torch.sum(texts != 0, dim=1)
# 前向传播
predictions = model(texts, text_lengths)
loss = criterion(predictions, labels)
valid_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 计算平均损失和准确率
train_loss_avg = train_loss / len(train_loader)
valid_loss_avg = valid_loss / len(valid_loader)
valid_acc = correct / total
# 保存最佳模型
if valid_loss_avg < best_valid_loss:
best_valid_loss = valid_loss_avg
torch.save(model.state_dict(), 'best_lstm_model.pt')
# 记录损失和准确率
train_losses.append(train_loss_avg)
valid_losses.append(valid_loss_avg)
valid_accs.append(valid_acc)
print(f'Epoch {epoch+1}/{epochs}, '
f'Train Loss: {train_loss_avg:.4f}, '
f'Valid Loss: {valid_loss_avg:.4f}, '
f'Valid Acc: {valid_acc:.4f}')
# 绘制训练过程
plt.figure(figsize=(12, 5))
# 损失图
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Valid Loss')
plt.title('Loss vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
# 准确率图
plt.subplot(1, 2, 2)
plt.plot(valid_accs, label='Valid Accuracy')
plt.title('Accuracy vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()
def evaluate_lstm_model(model, test_loader, criterion, device):
"""评估LSTM模型"""
model.eval()
test_loss = 0
correct = 0
total = 0
all_predictions = []
all_labels = []
with torch.no_grad():
for texts, labels in test_loader:
texts = texts.to(device)
labels = labels.to(device)
# 计算文本长度
text_lengths = torch.sum(texts != 0, dim=1)
# 前向传播
predictions = model(texts, text_lengths)
loss = criterion(predictions, labels)
test_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 保存预测结果和真实标签
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
# 计算平均损失和准确率
test_loss_avg = test_loss / len(test_loader)
test_acc = correct / total
print(f'Test Loss: {test_loss_avg:.4f}, Test Acc: {test_acc:.4f}')
return test_loss_avg, test_acc, np.array(all_predictions), np.array(all_labels)
def lstm_sentiment_analysis_demo():
# 检查是否有可用的GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 创建示例数据
def create_sample_data(n_samples=1000):
np.random.seed(42)
positive_words = ['good', 'great', 'excellent', 'wonderful', 'amazing', 'love', 'like']
negative_words = ['bad', 'terrible', 'horrible', 'awful', 'disappointed', 'hate', 'dislike']
neutral_words = ['the', 'and', 'is', 'in', 'to', 'of', 'for']
# 创建更复杂的句子,包含否定词和程度词
negation_words = ['not', 'never', 'no']
degree_words = ['very', 'extremely', 'quite', 'really']
texts = []
labels = []
# 生成积极样本
for _ in range(n_samples // 3):
# 基础积极句子
base_length = np.random.randint(3, 8)
base_words = np.random.choice(positive_words + neutral_words, size=base_length)
if not any(word in positive_words for word in base_words):
base_words[np.random.randint(base_length)] = np.random.choice(positive_words)
# 可能添加程度词
if np.random.random() > 0.5:
degree_word = np.random.choice(degree_words)
# 找到第一个积极词并在其前添加程度词
for i, word in enumerate(base_words):
if word in positive_words:
base_words = np.insert(base_words, i, degree_word)
break
text = ' '.join(base_words)
texts.append(text)
labels.append(1) # 积极
# 生成消极样本
for _ in range(n_samples // 3):
# 基础消极句子
base_length = np.random.randint(3, 8)
base_words = np.random.choice(negative_words + neutral_words, size=base_length)
if not any(word in negative_words for word in base_words):
base_words[np.random.randint(base_length)] = np.random.choice(negative_words)
# 可能添加程度词
if np.random.random() > 0.5:
degree_word = np.random.choice(degree_words)
# 找到第一个消极词并在其前添加程度词
for i, word in enumerate(base_words):
if word in negative_words:
base_words = np.insert(base_words, i, degree_word)
break
text = ' '.join(base_words)
texts.append(text)
labels.append(0) # 消极
# 生成中性样本
for _ in range(n_samples // 6):
length = np.random.randint(5, 15)
words = np.random.choice(neutral_words, size=length)
text = ' '.join(words)
texts.append(text)
labels.append(2) # 中性
# 生成包含否定词的复杂样本(可能翻转情感)
for _ in range(n_samples // 6):
# 基础情感句子
sentiment = np.random.choice([0, 1]) # 0:消极, 1:积极
sentiment_words = negative_words if sentiment == 0 else positive_words
base_length = np.random.randint(3, 8)
base_words = np.random.choice(sentiment_words + neutral_words, size=base_length)
if not any(word in sentiment_words for word in base_words):
base_words[np.random.randint(base_length)] = np.random.choice(sentiment_words)
# 添加否定词
negation_word = np.random.choice(negation_words)
# 在第一个情感词前添加否定词
for i, word in enumerate(base_words):
if word in sentiment_words:
base_words = np.insert(base_words, i, negation_word)
break
text = ' '.join(base_words)
texts.append(text)
# 情感可能被翻转
if np.random.random() > 0.3: # 70%的概率翻转情感
labels.append(1 - sentiment) # 翻转情感
else:
labels.append(sentiment) # 不翻转
# 打乱数据
indices = np.arange(len(texts))
np.random.shuffle(indices)
return np.array(texts)[indices], np.array(labels)[indices]
# 创建数据
texts, labels = create_sample_data(1000)
# 初始化文本预处理器并构建词汇表
class TextPreprocessor:
def __init__(self, max_vocab_size=10000, max_length=50):
self.max_vocab_size = max_vocab_size
self.max_length = max_length
self.vocab = {'<PAD>': 0, '<UNK>': 1} # 填充标记和未知词标记
self.reverse_vocab = {0: '<PAD>', 1: '<UNK>'}
self.vocab_size = 2 # 初始词汇表大小
def build_vocab(self, texts):
"""构建词汇表"""
word_counts = Counter()
# 统计词频
for text in texts:
tokens = text.lower().split()
word_counts.update(tokens)
# 按词频排序并选择前max_vocab_size个词
most_common = word_counts.most_common(self.max_vocab_size - 2) # 减去<PAD>和<UNK>
# 构建词汇表
for word, _ in most_common:
self.vocab[word] = self.vocab_size
self.reverse_vocab[self.vocab_size] = word
self.vocab_size += 1
return self.vocab
def text_to_sequence(self, text):
"""将文本转换为序列"""
tokens = text.lower().split()
sequence = []
for token in tokens:
if token in self.vocab:
sequence.append(self.vocab[token])
else:
sequence.append(self.vocab['<UNK>'])
# 截断或填充到固定长度
if len(sequence) > self.max_length:
sequence = sequence[:self.max_length]
else:
sequence += [self.vocab['<PAD>']] * (self.max_length - len(sequence))
return sequence
def batch_text_to_tensor(self, texts):
"""批量将文本转换为张量"""
sequences = [self.text_to_sequence(text) for text in texts]
return torch.LongTensor(sequences)
preprocessor = TextPreprocessor(max_vocab_size=5000, max_length=50)
preprocessor.build_vocab(texts)
# 将文本转换为张量
X_tensor = preprocessor.batch_text_to_tensor(texts)
y_tensor = torch.LongTensor(labels)
# 创建数据集
dataset = TensorDataset(X_tensor, y_tensor)
# 分割数据集
train_size = int(0.7 * len(dataset))
valid_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - valid_size
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 模型参数
vocab_size = preprocessor.vocab_size
embedding_dim = 100
hidden_dim = 256
output_dim = 3 # 积极、消极、中性
n_layers = 2
bidirectional = True # 双向LSTM
dropout = 0.5
pad_idx = preprocessor.vocab['<PAD>']
# 初始化模型
model = LSTM_SentimentAnalysis(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
hidden_dim=hidden_dim,
output_dim=output_dim,
n_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout,
pad_idx=pad_idx
)
# 将模型移至设备
model = model.to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
print("开始训练LSTM模型...")
train_lstm_model(model, train_loader, valid_loader, optimizer, criterion, epochs=15, device=device)
# 加载最佳模型
model.load_state_dict(torch.load('best_lstm_model.pt', map_location=device))
# 评估模型
print("\n评估LSTM模型...")
test_loss, test_acc, predictions, true_labels = evaluate_lstm_model(model, test_loader, criterion, device)
# 测试复杂情感文本
test_texts = [
"This product is very good and I love it",
"I hate this terrible product",
"The package arrived on time",
"This is not bad at all, it's actually quite good",
"The service was extremely disappointing but the product quality is excellent"
]
# 将测试文本转换为张量
test_tensor = preprocessor.batch_text_to_tensor(test_texts)
test_tensor = test_tensor.to(device)
# 预测
model.eval()
with torch.no_grad():
text_lengths = torch.sum(test_tensor != 0, dim=1)
outputs = model(test_tensor, text_lengths)
_, predicted_classes = torch.max(outputs, 1)
# 输出结果
print("\n复杂测试文本预测结果:")
sentiment_labels = {0: "Negative", 1: "Positive", 2: "Neutral"}
for i, (text, pred) in enumerate(zip(test_texts, predicted_classes)):
print(f"\nText {i+1}: {text}")
print(f"Predicted Sentiment: {sentiment_labels[pred.item()]}")
print(f"Confidence: {torch.softmax(outputs[i], dim=0).max().item():.4f}")
# 运行示例
lstm_sentiment_analysis_demo()Transformer模型通过自注意力机制能够有效捕获文本中的长距离依赖关系,在情感分析任务中取得了当前最优的性能。
基本原理:
优势:
Python实现示例(使用PyTorch实现简化版Transformer情感分析):
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import math
class PositionalEncoding(nn.Module):
"""位置编码模块"""
def __init__(self, embedding_dim, max_len=5000):
super(PositionalEncoding, self).__init__()
# 创建位置编码矩阵
pe = torch.zeros(max_len, embedding_dim)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() *
(-math.log(10000.0) / embedding_dim))
# 偶数位置使用sin,奇数位置使用cos
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 添加批次维度
pe = pe.unsqueeze(0)
# 注册为非参数的缓冲区
self.register_buffer('pe', pe)
def forward(self, x):
# x shape: [batch size, seq len, embedding dim]
# 加上位置编码
x = x + self.pe[:, :x.size(1), :].requires_grad_(False)
return x
class MultiHeadAttention(nn.Module):
"""多头自注意力机制"""
def __init__(self, embedding_dim, n_heads, dropout=0.1):
super(MultiHeadAttention, self).__init__()
self.embedding_dim = embedding_dim
self.n_heads = n_heads
self.head_dim = embedding_dim // n_heads
assert self.head_dim * n_heads == embedding_dim, "Embedding dimension must be divisible by number of heads"
# 线性变换层
self.query = nn.Linear(embedding_dim, embedding_dim)
self.key = nn.Linear(embedding_dim, embedding_dim)
self.value = nn.Linear(embedding_dim, embedding_dim)
self.out = nn.Linear(embedding_dim, embedding_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, q, k, v, mask=None):
batch_size = q.size(0)
# 线性变换
q = self.query(q).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
k = self.key(k).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
v = self.value(v).view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
# 计算注意力得分
scores = torch.matmul(q, k.permute(0, 1, 3, 2)) / math.sqrt(self.head_dim)
# 应用掩码
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# 注意力权重
attn = torch.softmax(scores, dim=-1)
attn = self.dropout(attn)
# 加权求和
output = torch.matmul(attn, v)
# 重塑输出
output = output.permute(0, 2, 1, 3).contiguous()
output = output.view(batch_size, -1, self.embedding_dim)
# 最终线性层
output = self.out(output)
return output, attn
class FeedForward(nn.Module):
"""前馈神经网络"""
def __init__(self, embedding_dim, ff_dim=2048, dropout=0.1):
super(FeedForward, self).__init__()
self.linear1 = nn.Linear(embedding_dim, ff_dim)
self.linear2 = nn.Linear(ff_dim, embedding_dim)
self.dropout = nn.Dropout(dropout)
self.activation = nn.GELU() # 使用GELU激活函数
def forward(self, x):
x = self.dropout(self.activation(self.linear1(x)))
x = self.linear2(x)
return x
class TransformerBlock(nn.Module):
"""Transformer编码器块"""
def __init__(self, embedding_dim, n_heads, ff_dim=2048, dropout=0.1):
super(TransformerBlock, self).__init__()
# 多头注意力层
self.attention = MultiHeadAttention(embedding_dim, n_heads, dropout)
# 前馈神经网络
self.feed_forward = FeedForward(embedding_dim, ff_dim, dropout)
# 层归一化
self.ln1 = nn.LayerNorm(embedding_dim)
self.ln2 = nn.LayerNorm(embedding_dim)
# Dropout层
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x, mask=None):
# 多头注意力子层
attn_output, _ = self.attention(x, x, x, mask)
x = x + self.dropout1(attn_output) # 残差连接
x = self.ln1(x) # 层归一化
# 前馈子层
ff_output = self.feed_forward(x)
x = x + self.dropout2(ff_output) # 残差连接
x = self.ln2(x) # 层归一化
return x
class TokenAndPositionEmbedding(nn.Module):
"""词嵌入和位置编码组合"""
def __init__(self, vocab_size, embedding_dim, max_len=512):
super(TokenAndPositionEmbedding, self).__init__()
# 词嵌入层
self.token_embedding = nn.Embedding(vocab_size, embedding_dim)
# 位置编码层
self.position_encoding = PositionalEncoding(embedding_dim, max_len)
def forward(self, x):
# 词嵌入
x = self.token_embedding(x)
# 添加位置编码
x = self.position_encoding(x)
return x
class TransformerSentimentAnalysis(nn.Module):
"""基于Transformer的情感分析模型"""
def __init__(self, vocab_size, embedding_dim, n_heads, n_layers,
ff_dim=2048, dropout=0.1, output_dim=3, max_len=512):
super(TransformerSentimentAnalysis, self).__init__()
# 词嵌入和位置编码
self.embedding = TokenAndPositionEmbedding(vocab_size, embedding_dim, max_len)
# Transformer编码器层
self.transformer_layers = nn.ModuleList([
TransformerBlock(embedding_dim, n_heads, ff_dim, dropout)
for _ in range(n_layers)
])
# 分类器
self.classifier = nn.Linear(embedding_dim, output_dim)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
# 词嵌入和位置编码
x = self.embedding(x)
x = self.dropout(x)
# 通过Transformer编码器层
for layer in self.transformer_layers:
x = layer(x, mask)
# 使用第一个标记的输出作为序列表示(类似于[CLS]标记)
x = x[:, 0, :]
# 分类
output = self.classifier(x)
return output
def create_padding_mask(seq, pad_token=0):
"""创建填充掩码"""
mask = (seq != pad_token).unsqueeze(1).unsqueeze(2)
# mask shape: [batch_size, 1, 1, seq_len]
return mask
def train_transformer_model(model, train_loader, valid_loader, optimizer, criterion, epochs, device):
"""训练Transformer模型"""
train_losses = []
valid_losses = []
valid_accs = []
best_valid_loss = float('inf')
for epoch in range(epochs):
# 训练模式
model.train()
train_loss = 0
for texts, labels in train_loader:
texts = texts.to(device)
labels = labels.to(device)
# 创建填充掩码
mask = create_padding_mask(texts)
mask = mask.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
predictions = model(texts, mask)
loss = criterion(predictions, labels)
# 反向传播和优化
loss.backward()
# 梯度裁剪
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
optimizer.step()
train_loss += loss.item()
# 验证模式
model.eval()
valid_loss = 0
correct = 0
total = 0
with torch.no_grad():
for texts, labels in valid_loader:
texts = texts.to(device)
labels = labels.to(device)
# 创建填充掩码
mask = create_padding_mask(texts)
mask = mask.to(device)
# 前向传播
predictions = model(texts, mask)
loss = criterion(predictions, labels)
valid_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 计算平均损失和准确率
train_loss_avg = train_loss / len(train_loader)
valid_loss_avg = valid_loss / len(valid_loader)
valid_acc = correct / total
# 保存最佳模型
if valid_loss_avg < best_valid_loss:
best_valid_loss = valid_loss_avg
torch.save(model.state_dict(), 'best_transformer_model.pt')
# 记录损失和准确率
train_losses.append(train_loss_avg)
valid_losses.append(valid_loss_avg)
valid_accs.append(valid_acc)
print(f'Epoch {epoch+1}/{epochs}, '
f'Train Loss: {train_loss_avg:.4f}, '
f'Valid Loss: {valid_loss_avg:.4f}, '
f'Valid Acc: {valid_acc:.4f}')
# 绘制训练过程
plt.figure(figsize=(12, 5))
# 损失图
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Valid Loss')
plt.title('Loss vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
# 准确率图
plt.subplot(1, 2, 2)
plt.plot(valid_accs, label='Valid Accuracy')
plt.title('Accuracy vs. Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()
def evaluate_transformer_model(model, test_loader, criterion, device):
"""评估Transformer模型"""
model.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
for texts, labels in test_loader:
texts = texts.to(device)
labels = labels.to(device)
# 创建填充掩码
mask = create_padding_mask(texts)
mask = mask.to(device)
# 前向传播
predictions = model(texts, mask)
loss = criterion(predictions, labels)
test_loss += loss.item()
# 计算准确率
_, predicted = torch.max(predictions.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# 计算平均损失和准确率
test_loss_avg = test_loss / len(test_loader)
test_acc = correct / total
print(f'Test Loss: {test_loss_avg:.4f}, Test Acc: {test_acc:.4f}')
return test_loss_avg, test_acc
def transformer_sentiment_analysis():
# 检查是否有可用的GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
# 创建示例数据
def create_sample_data(n_samples=1000):
np.random.seed(42)
positive_words = ['good', 'great', 'excellent', 'wonderful', 'amazing', 'love', 'like']
negative_words = ['bad', 'terrible', 'horrible', 'awful', 'disappointed', 'hate', 'dislike']
neutral_words = ['the', 'and', 'is', 'in', 'to', 'of', 'for']
# 创建更复杂的句子,包含否定词、程度词和转折词
negation_words = ['not', 'never', 'no']
degree_words = ['very', 'extremely', 'quite', 'really']
conjunction_words = ['but', 'however', 'yet', 'although', 'though']
texts = []
labels = []
# 生成积极样本
for _ in range(n_samples // 4):
# 基础积极句子
base_length = np.random.randint(5, 15)
base_words = np.random.choice(positive_words + neutral_words, size=base_length)
if not any(word in positive_words for word in base_words):
base_words[np.random.randint(base_length)] = np.random.choice(positive_words)
# 可能添加程度词
if np.random.random() > 0.5:
degree_word = np.random.choice(degree_words)
for i, word in enumerate(base_words):
if word in positive_words:
base_words = np.insert(base_words, i, degree_word)
break
text = ' '.join(base_words)
texts.append(text)
labels.append(1) # 积极
# 生成消极样本
for _ in range(n_samples // 4):
# 基础消极句子
base_length = np.random.randint(5, 15)
base_words = np.random.choice(negative_words + neutral_words, size=base_length)
if not any(word in negative_words for word in base_words):
base_words[np.random.randint(base_length)] = np.random.choice(negative_words)
# 可能添加程度词
if np.random.random() > 0.5:
degree_word = np.random.choice(degree_words)
for i, word in enumerate(base_words):
if word in negative_words:
base_words = np.insert(base_words, i, degree_word)
break
text = ' '.join(base_words)
texts.append(text)
labels.append(0) # 消极
# 生成中性样本
for _ in range(n_samples // 4):
length = np.random.randint(5, 15)
words = np.random.choice(neutral_words, size=length)
text = ' '.join(words)
texts.append(text)
labels.append(2) # 中性
# 生成包含转折词的复杂样本
for _ in range(n_samples // 4):
# 前半部分情感
sentiment1 = np.random.choice([0, 1])
sentiment_words1 = negative_words if sentiment1 == 0 else positive_words
part1_length = np.random.randint(3, 8)
part1 = np.random.choice(sentiment_words1 + neutral_words, size=part1_length)
if not any(word in sentiment_words1 for word in part1):
part1[np.random.randint(part1_length)] = np.random.choice(sentiment_words1)
# 添加转折词
conjunction = np.random.choice(conjunction_words)
# 后半部分情感(通常与前半部分相反)
sentiment2 = 1 - sentiment1 # 通常相反
sentiment_words2 = negative_words if sentiment2 == 0 else positive_words
part2_length = np.random.randint(3, 8)
part2 = np.random.choice(sentiment_words2 + neutral_words, size=part2_length)
if not any(word in sentiment_words2 for word in part2):
part2[np.random.randint(part2_length)] = np.random.choice(sentiment_words2)
# 组合句子
text = ' '.join(list(part1) + [conjunction] + list(part2))
texts.append(text)
# 通常转折后的情感更重要
labels.append(sentiment2)
# 打乱数据
indices = np.arange(len(texts))
np.random.shuffle(indices)
return np.array(texts)[indices], np.array(labels)[indices]
# 创建数据
texts, labels = create_sample_data(1000)
# 文本预处理
class TextPreprocessor:
def __init__(self, max_vocab_size=10000, max_length=50):
self.max_vocab_size = max_vocab_size
self.max_length = max_length
self.vocab = {'<PAD>': 0, '<UNK>': 1} # 填充标记和未知词标记
self.reverse_vocab = {0: '<PAD>', 1: '<UNK>'}
self.vocab_size = 2 # 初始词汇表大小
def build_vocab(self, texts):
"""构建词汇表"""
word_counts = Counter()
# 统计词频
for text in texts:
tokens = text.lower().split()
word_counts.update(tokens)
# 按词频排序并选择前max_vocab_size个词
most_common = word_counts.most_common(self.max_vocab_size - 2) # 减去<PAD>和<UNK>
# 构建词汇表
for word, _ in most_common:
self.vocab[word] = self.vocab_size
self.reverse_vocab[self.vocab_size] = word
self.vocab_size += 1
return self.vocab
def text_to_sequence(self, text):
"""将文本转换为序列"""
tokens = text.lower().split()
sequence = []
for token in tokens:
if token in self.vocab:
sequence.append(self.vocab[token])
else:
sequence.append(self.vocab['<UNK>'])
# 截断或填充到固定长度
if len(sequence) > self.max_length:
sequence = sequence[:self.max_length]
else:
sequence += [self.vocab['<PAD>']] * (self.max_length - len(sequence))
return sequence
def batch_text_to_tensor(self, texts):
"""批量将文本转换为张量"""
sequences = [self.text_to_sequence(text) for text in texts]
return torch.LongTensor(sequences)
preprocessor = TextPreprocessor(max_vocab_size=5000, max_length=50)
preprocessor.build_vocab(texts)
# 将文本转换为张量
X_tensor = preprocessor.batch_text_to_tensor(texts)
y_tensor = torch.LongTensor(labels)
# 创建数据集
dataset = TensorDataset(X_tensor, y_tensor)
# 分割数据集
train_size = int(0.7 * len(dataset))
valid_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - valid_size
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])
# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# 模型参数
vocab_size = preprocessor.vocab_size
embedding_dim = 128
n_heads = 4
n_layers = 2
ff_dim = 512
output_dim = 3 # 积极、消极、中性
dropout = 0.3
max_len = 50
# 初始化模型
model = TransformerSentimentAnalysis(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
n_heads=n_heads,
n_layers=n_layers,
ff_dim=ff_dim,
dropout=dropout,
output_dim=output_dim,
max_len=max_len
)
# 将模型移至设备
model = model.to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-8)
# 训练模型
print("开始训练Transformer模型...")
train_transformer_model(model, train_loader, valid_loader, optimizer, criterion, epochs=10, device=device)
# 加载最佳模型
model.load_state_dict(torch.load('best_transformer_model.pt', map_location=device))
# 评估模型
print("\n评估Transformer模型...")
test_loss, test_acc = evaluate_transformer_model(model, test_loader, criterion, device)
# 测试复杂文本
test_texts = [
"I absolutely love this product, it's fantastic and works perfectly",
"This is the worst experience I've ever had with a company",
"The delivery was on time but the product quality is terrible",
"Although the packaging was damaged, the product itself works well",
"The service was not very good at first, but they improved a lot"
]
# 将测试文本转换为张量
test_tensor = preprocessor.batch_text_to_tensor(test_texts)
test_tensor = test_tensor.to(device)
# 创建掩码
mask = create_padding_mask(test_tensor)
mask = mask.to(device)
# 预测
model.eval()
with torch.no_grad():
outputs = model(test_tensor, mask)
_, predicted_classes = torch.max(outputs, 1)
# 输出结果
print("\n复杂测试文本预测结果:")
sentiment_labels = {0: "Negative", 1: "Positive", 2: "Neutral"}
for i, (text, pred) in enumerate(zip(test_texts, predicted_classes)):
print(f"\nText {i+1}: {text}")
print(f"Predicted Sentiment: {sentiment_labels[pred.item()]}")
probs = torch.softmax(outputs[i], dim=0)
print(f"Probabilities: Positive={probs[1]:.4f}, Negative={probs[0]:.4f}, Neutral={probs[2]:.4f}")
# 运行示例
transformer_sentiment_analysis()预训练语言模型通过在大规模语料上进行预训练,学习到丰富的语言表示,在情感分析任务中取得了显著的性能提升。
BERT(Bidirectional Encoder Representations from Transformers)是一种双向Transformer编码器模型,通过掩码语言模型和下一句预测任务进行预训练。
BERT在情感分析中的优势:
常用BERT变体:
Python实现示例(使用Hugging Face Transformers实现BERT情感分析):
# 安装必要的库: pip install transformers datasets torch evaluate
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import load_dataset, DatasetDict
import evaluate
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
def bert_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 步骤1: 准备数据集
# 这里使用一个简单的示例,实际应用中可以加载自己的数据集
def create_sample_dataset():
# 创建一些示例数据
positive_samples = [
"I love this product, it's amazing!",
"The service was excellent and the staff were very friendly.",
"This movie was fantastic, I highly recommend it.",
"Great experience overall, will definitely come back.",
"The food was delicious and the ambiance was wonderful."
]
negative_samples = [
"Terrible product, doesn't work as advertised.",
"I'm very disappointed with the quality of this item.",
"The service was horrible and the staff were rude.",
"Worst experience ever, would not recommend.",
"The food was cold and tasted awful."
]
neutral_samples = [
"The product arrived on time and in good condition.",
"The movie was okay, not great but not terrible.",
"The service was average, nothing special.",
"It was an uneventful experience.",
"The food was acceptable but not outstanding."
]
# 组合数据
texts = positive_samples + negative_samples + neutral_samples
labels = [1] * len(positive_samples) + [0] * len(negative_samples) + [2] * len(neutral_samples)
# 创建数据集字典
data = {
'train': {'text': texts[:10], 'label': labels[:10]},
'validation': {'text': texts[10:12], 'label': labels[10:12]},
'test': {'text': texts[12:], 'label': labels[12:]}
}
# 创建DatasetDict
dataset = DatasetDict({
'train': load_dataset('csv', data_files={'text': data['train']['text'], 'label': data['train']['label']}, split='train'),
'validation': load_dataset('csv', data_files={'text': data['validation']['text'], 'label': data['validation']['label']}, split='train'),
'test': load_dataset('csv', data_files={'text': data['test']['text'], 'label': data['test']['label']}, split='train')
})
return dataset
# 注意:上面的create_sample_dataset函数有问题,我们直接使用简单的数据结构
def create_simple_dataset():
# 创建一些示例数据
positive_samples = [
"I love this product, it's amazing!",
"The service was excellent and the staff were very friendly.",
"This movie was fantastic, I highly recommend it.",
"Great experience overall, will definitely come back.",
"The food was delicious and the ambiance was wonderful."
]
negative_samples = [
"Terrible product, doesn't work as advertised.",
"I'm very disappointed with the quality of this item.",
"The service was horrible and the staff were rude.",
"Worst experience ever, would not recommend.",
"The food was cold and tasted awful."
]
neutral_samples = [
"The product arrived on time and in good condition.",
"The movie was okay, not great but not terrible.",
"The service was average, nothing special.",
"It was an uneventful experience.",
"The food was acceptable but not outstanding."
]
# 组合数据
texts = positive_samples + negative_samples + neutral_samples
labels = [1] * len(positive_samples) + [0] * len(negative_samples) + [2] * len(neutral_samples)
# 创建数据集
from datasets import Dataset
train_dataset = Dataset.from_dict({'text': texts[:10], 'label': labels[:10]})
validation_dataset = Dataset.from_dict({'text': texts[10:12], 'label': labels[10:12]})
test_dataset = Dataset.from_dict({'text': texts[12:], 'label': labels[12:]})
dataset = DatasetDict({
'train': train_dataset,
'validation': validation_dataset,
'test': test_dataset
})
return dataset
# 使用简单方法创建数据集
dataset = create_simple_dataset()
print(f"数据集大小: 训练集={len(dataset['train'])}, 验证集={len(dataset['validation'])}, 测试集={len(dataset['test'])}")
# 步骤2: 加载预训练模型和分词器
model_name = "bert-base-uncased" # 对于英文文本
# 对于中文文本,可以使用: "bert-base-chinese"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=3 # 3个类别:消极、积极、中性
)
# 将模型移至设备
model = model.to(device)
# 步骤3: 数据预处理函数
def preprocess_function(examples):
return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=128)
# 预处理数据集
tokenized_datasets = dataset.map(preprocess_function, batched=True)
# 步骤4: 设置评估指标
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
# 使用evaluate库的指标
accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")
accuracy_score = accuracy.compute(predictions=predictions, references=labels)
precision_score = precision.compute(predictions=predictions, references=labels, average="weighted")
recall_score = recall.compute(predictions=predictions, references=labels, average="weighted")
f1_score = f1.compute(predictions=predictions, references=labels, average="weighted")
return {
"accuracy": accuracy_score["accuracy"],
"precision": precision_score["precision"],
"recall": recall_score["recall"],
"f1": f1_score["f1"]
}
# 步骤5: 设置训练参数
training_args = TrainingArguments(
output_dir="./bert_sentiment_analysis",
learning_rate=2e-5,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=3,
weight_decay=0.01,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
push_to_hub=False,
)
# 步骤6: 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
tokenizer=tokenizer,
compute_metrics=compute_metrics,
)
# 步骤7: 训练模型
print("开始训练模型...")
trainer.train()
# 步骤8: 评估模型
print("评估模型...")
eval_results = trainer.evaluate()
print(f"评估结果: {eval_results}")
# 步骤9: 在测试集上预测
print("在测试集上预测...")
predictions = trainer.predict(tokenized_datasets["test"])
predicted_labels = np.argmax(predictions.predictions, axis=-1)
true_labels = predictions.label_ids
# 打印分类报告
print("\n分类报告:")
target_names = ["Negative", "Positive", "Neutral"]
report = classification_report(true_labels, predicted_labels, target_names=target_names)
print(report)
# 绘制混淆矩阵
cm = confusion_matrix(true_labels, predicted_labels)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
# 步骤10: 保存模型
print("保存模型...")
trainer.save_model("./bert_sentiment_analysis_best")
tokenizer.save_pretrained("./bert_sentiment_analysis_best")
# 步骤11: 使用模型进行推理
def predict_sentiment(texts, model, tokenizer, device):
model.eval()
results = []
for text in texts:
# 预处理文本
inputs = tokenizer(text, truncation=True, padding='max_length', max_length=128, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射到情感标签
sentiment = target_names[predicted_class]
confidence = probabilities[0, predicted_class].item()
results.append({
'text': text,
'sentiment': sentiment,
'confidence': confidence,
'probabilities': probabilities.cpu().numpy()[0].tolist()
})
return results
# 测试新文本
test_texts = [
"This product is really good, I'm very satisfied with it.",
"I'm so disappointed, this is not what I expected.",
"The weather today is nice and sunny.",
"The service was terrible but the food was delicious.",
"I don't like it, but it's not the worst thing ever."
]
print("\n测试新文本:")
results = predict_sentiment(test_texts, model, tokenizer, device)
for i, result in enumerate(results):
print(f"\n文本 {i+1}: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"置信度: {result['confidence']:.4f}")
print(f"各分类概率: 消极={result['probabilities'][0]:.4f}, 积极={result['probabilities'][1]:.4f}, 中性={result['probabilities'][2]:.4f}")
# 运行示例
try:
bert_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers datasets torch evaluate scikit-learn seaborn matplotlib")GPT(Generative Pre-trained Transformer)模型是基于Transformer解码器的自回归语言模型,在情感分析任务中也有出色表现。
GPT模型的特点:
常用GPT变体:
Python实现示例(使用Hugging Face Transformers实现GPT-2情感分析):
# 安装必要的库: pip install transformers torch
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import numpy as np
def gpt_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 加载GPT-2模型和分词器
model_name = "gpt2" # 可以尝试更大的模型如 "gpt2-medium" 或 "gpt2-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
# 将模型移至设备
model = model.to(device)
# 方法1: 使用提示工程进行情感分析
def sentiment_analysis_with_prompt(texts):
results = []
for text in texts:
# 构建情感分析提示
prompt = f"文本: '{text}'\n情感分析: 这是一个"
# 生成情感分析结果
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=10, # 只生成几个词
num_return_sequences=1,
temperature=0.7, # 控制生成的随机性
top_k=50,
top_p=0.95,
do_sample=True,
pad_token_id=tokenizer.eos_token_id # 避免警告
)
# 解码生成的文本
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
# 提取情感分析结果
sentiment_part = generated_text[len(prompt):].strip()
# 简单的情感分类
sentiment = "未知"
if any(pos in sentiment_part.lower() for pos in ['积极', '正面', '好', 'positive', 'good']):
sentiment = "积极"
elif any(neg in sentiment_part.lower() for neg in ['消极', '负面', '坏', 'negative', 'bad']):
sentiment = "消极"
elif any(neu in sentiment_part.lower() for neu in ['中性', '一般', 'neutral', 'normal']):
sentiment = "中性"
results.append({
'text': text,
'sentiment': sentiment,
'generated_analysis': sentiment_part
})
return results
# 方法2: 使用情感分析pipeline (内部使用特定的分类模型)
def sentiment_analysis_with_pipeline(texts):
# 创建情感分析pipeline
sentiment_analyzer = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english", # 这个是针对情感分析微调的模型
device=device if torch.cuda.is_available() else -1 # -1表示使用CPU
)
# 进行情感分析
results = sentiment_analyzer(texts)
# 处理结果
processed_results = []
for text, result in zip(texts, results):
# 将二分类结果映射为三分类
sentiment = "积极" if result['label'] == 'POSITIVE' else "消极"
confidence = result['score']
# 如果置信度较低,可能是中性
if confidence < 0.7:
sentiment = "中性"
processed_results.append({
'text': text,
'sentiment': sentiment,
'confidence': confidence,
'original_label': result['label']
})
return processed_results
# 方法3: 使用GPT模型进行零样本分类
def zero_shot_sentiment_analysis(texts):
# 创建零样本分类pipeline
zero_shot_classifier = pipeline(
"zero-shot-classification",
model=model_name,
device=device if torch.cuda.is_available() else -1
)
# 情感类别
candidate_labels = ["积极", "消极", "中性"] # 使用中文标签
# 进行零样本分类
results = zero_shot_classifier(texts, candidate_labels)
# 处理结果
processed_results = []
for i, result in enumerate(results):
text = texts[i]
sentiment = result['labels'][0] # 获取概率最高的标签
scores = dict(zip(result['labels'], result['scores']))
processed_results.append({
'text': text,
'sentiment': sentiment,
'scores': scores
})
return processed_results
# 测试文本
test_texts = [
"This product is amazing, I love it so much!",
"I'm very disappointed with the quality of this item.",
"The weather today is neither good nor bad.",
"The service was terrible but the food was delicious.",
"I don't know how to feel about this situation."
]
# 使用提示工程方法
print("\n使用提示工程进行情感分析:")
prompt_results = sentiment_analysis_with_prompt(test_texts)
for result in prompt_results:
print(f"\n文本: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"生成分析: {result['generated_analysis']}")
# 使用pipeline方法
print("\n使用情感分析pipeline:")
pipeline_results = sentiment_analysis_with_pipeline(test_texts)
for result in pipeline_results:
print(f"\n文本: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"置信度: {result['confidence']:.4f}")
# 使用零样本分类方法
print("\n使用零样本分类:")
zero_shot_results = zero_shot_sentiment_analysis(test_texts)
for result in zero_shot_results:
print(f"\n文本: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"各类别概率: {result['scores']}")
# 方法对比
print("\n方法对比:")
print("1. 提示工程方法: 直接使用生成能力,但结果可能不够稳定")
print("2. 情感分析pipeline: 基于特定微调模型,准确率高但只有二分类")
print("3. 零样本分类: 无需微调,可自定义类别,但需要更大的模型")
# 运行示例
try:
gpt_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch")中文预训练模型针对中文特点进行了优化,在中文情感分析任务中表现更好。
常用中文预训练模型:
Python实现示例(使用中文预训练模型进行情感分析):
# 安装必要的库: pip install transformers torch datasets
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import Dataset, DatasetDict
import evaluate
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
def chinese_pretrained_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 创建中文示例数据集
def create_chinese_dataset():
# 中文情感分析示例数据
positive_samples = [
"这个产品太棒了,我非常喜欢!",
"服务态度很好,下次还会再来。",
"电影很精彩,强烈推荐大家观看。",
"食物非常美味,环境也很舒适。",
"质量很好,价格也很实惠。"
]
negative_samples = [
"质量太差了,根本不值这个价格。",
"服务态度非常差,很不满意。",
"这个产品不好用,经常出问题。",
"非常失望,不会再购买了。",
"物流很慢,包装也破损了。"
]
neutral_samples = [
"产品一般般,没有特别的优点。",
"服务态度还行,不算特别好。",
"电影情节一般,特效还可以。",
"价格适中,质量也一般。",
"整体感觉普通,没有惊喜。"
]
# 组合数据
texts = positive_samples + negative_samples + neutral_samples
labels = [1] * len(positive_samples) + [0] * len(negative_samples) + [2] * len(neutral_samples)
# 创建数据集
train_dataset = Dataset.from_dict({'text': texts[:10], 'label': labels[:10]})
validation_dataset = Dataset.from_dict({'text': texts[10:12], 'label': labels[10:12]})
test_dataset = Dataset.from_dict({'text': texts[12:], 'label': labels[12:]})
dataset = DatasetDict({
'train': train_dataset,
'validation': validation_dataset,
'test': test_dataset
})
return dataset
# 创建数据集
dataset = create_chinese_dataset()
print(f"数据集大小: 训练集={len(dataset['train'])}, 验证集={len(dataset['validation'])}, 测试集={len(dataset['test'])}")
# 加载中文预训练模型和分词器
# 可以尝试不同的中文预训练模型
model_names = {
"BERT-wwm-ext": "hfl/chinese-bert-wwm-ext",
"RoBERTa-wwm-ext": "hfl/chinese-roberta-wwm-ext",
"MacBERT": "hfl/chinese-macbert-base"
}
# 选择一个模型
model_name = model_names["RoBERTa-wwm-ext"]
print(f"使用模型: {model_name}")
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=3 # 3个类别:消极、积极、中性
)
# 将模型移至设备
model = model.to(device)
# 数据预处理函数
def preprocess_function(examples):
return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=128)
# 预处理数据集
tokenized_datasets = dataset.map(preprocess_function, batched=True)
# 设置评估指标
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
# 计算准确率
accuracy = evaluate.load("accuracy")
accuracy_score = accuracy.compute(predictions=predictions, references=labels)
# 计算F1分数
f1 = evaluate.load("f1")
f1_score = f1.compute(predictions=predictions, references=labels, average="weighted")
return {
"accuracy": accuracy_score["accuracy"],
"f1": f1_score["f1"]
}
# 设置训练参数
training_args = TrainingArguments(
output_dir="./chinese_sentiment_analysis",
learning_rate=2e-5,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
num_train_epochs=3,
weight_decay=0.01,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
push_to_hub=False,
)
# 创建Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
tokenizer=tokenizer,
compute_metrics=compute_metrics,
)
# 训练模型
print("开始训练模型...")
trainer.train()
# 评估模型
print("评估模型...")
eval_results = trainer.evaluate()
print(f"评估结果: {eval_results}")
# 在测试集上预测
print("在测试集上预测...")
predictions = trainer.predict(tokenized_datasets["test"])
predicted_labels = np.argmax(predictions.predictions, axis=-1)
true_labels = predictions.label_ids
# 打印分类报告
print("\n分类报告:")
target_names = ["消极", "积极", "中性"]
report = classification_report(true_labels, predicted_labels, target_names=target_names)
print(report)
# 绘制混淆矩阵
cm = confusion_matrix(true_labels, predicted_labels)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names)
plt.title('混淆矩阵')
plt.xlabel('预测标签')
plt.ylabel('真实标签')
plt.show()
# 保存模型
print("保存模型...")
trainer.save_model("./chinese_sentiment_model")
tokenizer.save_pretrained("./chinese_sentiment_model")
# 使用模型进行推理
def predict_sentiment(texts, model, tokenizer, device):
model.eval()
results = []
for text in texts:
# 预处理文本
inputs = tokenizer(text, truncation=True, padding='max_length', max_length=128, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射到情感标签
sentiment = target_names[predicted_class]
confidence = probabilities[0, predicted_class].item()
results.append({
'text': text,
'sentiment': sentiment,
'confidence': confidence,
'probabilities': probabilities.cpu().numpy()[0].tolist()
})
return results
# 测试新的中文文本
test_texts = [
"这个电影真的很好看,情节紧凑,演员表演出色。",
"服务态度太差了,等了很久都没人理我。",
"今天天气不错,不冷不热。",
"虽然价格有点贵,但是质量确实很好。",
"这个产品一般般,没有想象中那么好。"
]
print("\n测试新文本:")
results = predict_sentiment(test_texts, model, tokenizer, device)
for i, result in enumerate(results):
print(f"\n文本 {i+1}: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"置信度: {result['confidence']:.4f}")
print(f"各分类概率: 消极={result['probabilities'][0]:.4f}, 积极={result['probabilities'][1]:.4f}, 中性={result['probabilities'][2]:.4f}")
# 不同模型性能对比建议
print("\n不同中文预训练模型性能对比建议:")
print("1. BERT-wwm-ext: 基础模型,性能稳定,适合一般场景")
print("2. RoBERTa-wwm-ext: 性能优于BERT-wwm-ext,训练时间更长")
print("3. MacBERT: 在多项中文任务上表现优异,推荐使用")
print("4. ERNIE: 融入知识信息,适合需要知识推理的场景")
print("5. ALBERT-zh: 轻量级模型,适合资源受限环境")
# 运行示例
try:
chinese_pretrained_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch datasets evaluate scikit-learn seaborn matplotlib")极性分类是最基础的情感分析任务,旨在判断文本的整体情感倾向。
分类体系:
主要挑战:
应用场景:
评估指标:
细粒度情感分析比传统的极性分类更加精细,可以分析文本中特定实体或方面的情感倾向。
任务类型:
Python实现示例(使用BERT进行方面级情感分析):
# 安装必要的库: pip install transformers torch
import torch
from transformers import BertTokenizer, BertForSequenceClassification
import numpy as np
def aspect_based_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 加载预训练模型和分词器
model_name = "bert-base-uncased"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=3 # 3个标签:消极、中性、积极
)
# 将模型移至设备
model = model.to(device)
# 定义方面级情感分析函数
def analyze_aspect_sentiment(text, aspect, model, tokenizer, device):
# 构建输入文本,格式为 [CLS] aspect [SEP] text [SEP]
# 这种格式可以帮助模型关注特定方面
input_text = f"{aspect} [SEP] {text}"
# 分词并转换为模型输入格式
inputs = tokenizer(
input_text,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
# 将输入移至设备
inputs = {k: v.to(device) for k, v in inputs.items()}
# 推理
model.eval()
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射到情感标签
sentiment_map = {0: "消极", 1: "中性", 2: "积极"}
sentiment = sentiment_map[predicted_class]
confidence = probabilities[0, predicted_class].item()
return {
'aspect': aspect,
'sentiment': sentiment,
'confidence': confidence,
'probabilities': probabilities.cpu().numpy()[0].tolist()
}
# 定义批处理方面级情感分析函数
def batch_analyze_aspect_sentiment(texts_with_aspects, model, tokenizer, device):
results = []
for item in texts_with_aspects:
text = item['text']
aspects = item['aspects']
text_results = {
'text': text,
'aspect_results': []
}
for aspect in aspects:
aspect_result = analyze_aspect_sentiment(
text, aspect, model, tokenizer, device
)
text_results['aspect_results'].append(aspect_result)
results.append(text_results)
return results
# 测试数据
test_data = [
{
'text': "这家餐厅的食物很美味,但是服务态度很差,价格也有点贵。",
'aspects': ["食物", "服务态度", "价格"]
},
{
'text': "这款手机的屏幕显示效果非常好,电池续航也不错,但相机拍照质量一般。",
'aspects': ["屏幕", "电池续航", "相机"]
},
{
'text': "这部电影的剧情很精彩,演员表演出色,但特效做得不太好。",
'aspects': ["剧情", "演员表演", "特效"]
}
]
# 进行分析
print("开始方面级情感分析...")
results = batch_analyze_aspect_sentiment(test_data, model, tokenizer, device)
# 打印结果
for i, result in enumerate(results):
print(f"\n文本 {i+1}: {result['text']}")
print("方面情感分析结果:")
for aspect_result in result['aspect_results']:
print(f" - 方面: {aspect_result['aspect']}")
print(f" 情感: {aspect_result['sentiment']}")
print(f" 置信度: {aspect_result['confidence']:.4f}")
# 注意:这里使用的是未微调的BERT模型,实际应用中需要在具体数据集上微调
print("\n注意:在实际应用中,应在特定的方面级情感分析数据集上微调模型,如SemEval ABSA数据集。")
print("推荐使用专门针对ABSA任务优化的模型架构,如ATAE-LSTM、IAN、RAM等。")
# 运行示例
try:
aspect_based_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch")细粒度情感分析的主要挑战:
应用价值:
多模态情感分析结合了文本、图像、音频等多种模态信息,提供更全面的情感理解。
模态类型:
融合策略:
Python实现示例(简化的图像-文本多模态情感分析):
# 安装必要的库: pip install transformers torch pillow numpy
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel, ViTModel, ViTConfig
from PIL import Image
import numpy as np
def multimodal_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 定义多模态模型
class MultimodalSentimentModel(nn.Module):
def __init__(self, text_hidden_size=768, visual_hidden_size=768, num_classes=3):
super(MultimodalSentimentModel, self).__init__()
# 文本编码器
self.text_encoder = BertModel.from_pretrained('bert-base-uncased')
# 视觉编码器 (使用Vision Transformer)
self.visual_encoder = ViTModel.from_pretrained('google/vit-base-patch16-224')
# 模态融合层
self.fusion_layer = nn.Linear(text_hidden_size + visual_hidden_size, 512)
# 分类层
self.classifier = nn.Linear(512, num_classes)
# 激活函数和dropout
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, text_inputs, visual_inputs):
# 文本特征提取
text_outputs = self.text_encoder(**text_inputs)
text_features = text_outputs.pooler_output # [CLS] token的表示
# 视觉特征提取
visual_outputs = self.visual_encoder(**visual_inputs)
visual_features = visual_outputs.pooler_output
# 特征融合
fused_features = torch.cat((text_features, visual_features), dim=1)
fused_features = self.fusion_layer(fused_features)
fused_features = self.relu(fused_features)
fused_features = self.dropout(fused_features)
# 分类
logits = self.classifier(fused_features)
return logits
# 初始化模型
model = MultimodalSentimentModel()
model = model.to(device)
# 加载预训练分词器
text_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# 图像处理函数
def preprocess_image(image_path, size=(224, 224)):
# 打开并调整图像大小
image = Image.open(image_path).convert('RGB')
image = image.resize(size)
# 转换为numpy数组
image_array = np.array(image)
# 标准化
image_array = image_array / 255.0
image_array = (image_array - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225] # ImageNet标准化
# 转换为PyTorch张量并调整维度
image_tensor = torch.tensor(image_array).permute(2, 0, 1).unsqueeze(0).float()
return image_tensor
# 推理函数
def predict_sentiment(text, image_tensor, model, text_tokenizer, device):
# 预处理文本
text_inputs = text_tokenizer(
text,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
# 将输入移至设备
text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
image_tensor = image_tensor.to(device)
# 创建视觉输入
visual_inputs = {'pixel_values': image_tensor}
# 推理
model.eval()
with torch.no_grad():
logits = model(text_inputs, visual_inputs)
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射到情感标签
sentiment_map = {0: "消极", 1: "中性", 2: "积极"}
sentiment = sentiment_map[predicted_class]
confidence = probabilities[0, predicted_class].item()
return {
'text': text,
'sentiment': sentiment,
'confidence': confidence,
'probabilities': probabilities.cpu().numpy()[0].tolist()
}
# 注意:在实际应用中,你需要加载实际的图像路径
print("多模态情感分析模型架构定义完成。")
print("在实际应用中,你需要:")
print("1. 准备带有情感标签的文本-图像对数据集")
print("2. 在数据集上微调模型")
print("3. 使用真实图像和文本进行测试")
print("\n示例调用代码:")
print("'''")
print("# 假设我们有一个图像路径和对应的文本")
print("image_path = 'example.jpg'")
print("text = 'This product is amazing! I love it so much.'")
print("\n# 预处理图像")
print("image_tensor = preprocess_image(image_path)")
print("\n# 预测情感")
print("result = predict_sentiment(text, image_tensor, model, text_tokenizer, device)")
print("print(f'预测情感: {result["sentiment"]}')")
print("print(f'置信度: {result["confidence"]:.4f}')")
print("'''")
# 运行示例
try:
multimodal_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch pillow numpy")多模态情感分析的优势:
应用场景:
情绪检测是情感分析的一个重要变体,关注识别文本中表达的具体情绪状态,而不仅仅是极性。
主要情绪模型:
Python实现示例(使用Hugging Face进行情绪检测):
# 安装必要的库: pip install transformers torch
from transformers import pipeline
import torch
def emotion_detection():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device_id = 0 if torch.cuda.is_available() else -1
print(f"使用设备: {device}")
# 加载情绪检测模型
emotion_classifier = pipeline(
"text-classification",
model="j-hartmann/emotion-english-distilroberta-base",
device=device_id
)
# 定义情绪检测函数
def detect_emotions(texts, classifier):
results = []
for text in texts:
# 使用分类器进行预测
predictions = classifier(text, return_all_scores=True)[0]
# 按概率排序
predictions.sort(key=lambda x: x['score'], reverse=True)
# 获取主要情绪
primary_emotion = predictions[0]['label']
primary_score = predictions[0]['score']
results.append({
'text': text,
'primary_emotion': primary_emotion,
'primary_score': primary_score,
'all_emotions': {pred['label']: pred['score'] for pred in predictions}
})
return results
# 测试文本
test_texts = [
"我今天非常开心,收到了一份很棒的礼物!",
"这个消息太可怕了,我简直不敢相信。",
"我感到非常生气,这种行为是完全不能接受的。",
"听到这个消息,我感到很惊讶。",
"失去了亲人,我感到非常悲伤。",
"这个食物的味道让我感到恶心。"
]
# 进行情绪检测
print("开始情绪检测...")
results = detect_emotions(test_texts, emotion_classifier)
# 打印结果
for i, result in enumerate(results):
print(f"\n文本 {i+1}: {result['text']}")
print(f"主要情绪: {result['primary_emotion']} (置信度: {result['primary_score']:.4f})")
print("所有情绪得分:")
for emotion, score in result['all_emotions'].items():
print(f" - {emotion}: {score:.4f}")
# 情绪分析的应用建议
print("\n情绪检测的应用场景:")
print("1. 心理健康监测: 通过分析文本情绪变化,监测心理健康状态")
print("2. 内容推荐: 根据用户情绪状态推荐合适的内容")
print("3. 客户体验: 分析客户反馈中的具体情绪,优化产品和服务")
print("4. 舆情分析: 监测社交媒体中的情绪倾向和变化")
print("5. 教育应用: 分析学生的情绪状态,提供个性化的学习支持")
# 运行示例
try:
emotion_detection()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch")
except Exception as e:
print(f"运行时错误: {e}")
print("\n如果遇到网络问题,可以尝试使用本地下载的模型或其他替代方案。")情绪检测的挑战:
多语言情感分析处理不同语言的文本,涉及跨语言迁移和语言特定的处理。
主要策略:
常用多语言预训练模型:
Python实现示例(使用XLM-RoBERTa进行多语言情感分析):
# 安装必要的库: pip install transformers torch
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import numpy as np
def multilingual_sentiment_analysis():
# 检查设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 加载多语言模型和分词器
model_name = "cardiffnlp/twitter-xlm-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 将模型移至设备
model = model.to(device)
# 定义多语言情感分析函数
def analyze_sentiment(texts, model, tokenizer, device):
results = []
for text in texts:
# 预处理文本
inputs = tokenizer(
text,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
# 将输入移至设备
inputs = {k: v.to(device) for k, v in inputs.items()}
# 推理
model.eval()
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射到情感标签 (这个模型使用0:消极, 1:中性, 2:积极)
sentiment_map = {0: "消极", 1: "中性", 2: "积极"}
sentiment = sentiment_map[predicted_class]
confidence = probabilities[0, predicted_class].item()
results.append({
'text': text,
'sentiment': sentiment,
'confidence': confidence,
'probabilities': probabilities.cpu().numpy()[0].tolist()
})
return results
# 测试多种语言的文本
test_texts = [
# 英语
"I love this product, it's amazing!",
"This is terrible, I want a refund.",
# 中文
"这个产品非常好,我很满意。",
"服务很差,我不会再买了。",
# 西班牙语
"Este producto es excelente, lo recomiendo mucho.",
"El servicio fue malo y el precio es demasiado alto.",
# 法语
"Ce film est incroyable, je l'ai adoré!",
"Je suis très déçu par ce service."
]
# 进行情感分析
print("开始多语言情感分析...")
results = analyze_sentiment(test_texts, model, tokenizer, device)
# 打印结果
language_map = {
0: "英语 (Positive)",
1: "英语 (Negative)",
2: "中文 (Positive)",
3: "中文 (Negative)",
4: "西班牙语 (Positive)",
5: "西班牙语 (Negative)",
6: "法语 (Positive)",
7: "法语 (Negative)"
}
for i, result in enumerate(results):
lang_info = language_map.get(i, f"文本 {i+1}")
print(f"\n{lang_info}: {result['text']}")
print(f"预测情感: {result['sentiment']}")
print(f"置信度: {result['confidence']:.4f}")
# 多语言情感分析的挑战与建议
print("\n多语言情感分析的挑战:")
print("1. 语言特定的表达和文化差异")
print("2. 低资源语言的数据不足")
print("3. 跨语言迁移的效果不均衡")
print("4. 语言混合和代码切换的处理")
print("\n实用建议:")
print("1. 对特定语言进行微调可以提高性能")
print("2. 结合语言检测可以更准确地处理多语言输入")
print("3. 对于低资源语言,可考虑数据增强或迁移学习")
print("4. 评估时应分别对每种语言进行性能分析")
# 运行示例
try:
multilingual_sentiment_analysis()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch")
except Exception as e:
print(f"运行时错误: {e}")
print("\n如果遇到网络问题,可以尝试使用其他预训练模型或本地下载的模型。")多语言情感分析的应用价值:
应用场景:分析电商平台上的商品评论,了解用户对产品的满意度和具体关注点。
具体功能:
实施步骤:
价值与效益:
应用场景:监测社交媒体上关于品牌、产品或事件的讨论和情感倾向。
具体功能:
实施步骤:
价值与效益:
应用场景:分析新闻、社交媒体和金融报告中的情绪,预测市场趋势。
具体功能:
实施步骤:
价值与效益:
应用场景:分析客户服务对话和反馈,评估服务质量并识别改进机会。
具体功能:
实施步骤:
价值与效益:
应用场景:分析电影、电视剧等娱乐内容的评论和讨论,了解观众反应。
具体功能:
实施步骤:
价值与效益:
部署方式:
部署架构:
用户请求 → API网关 → 负载均衡器 → 模型服务集群 → 数据存储/缓存Python实现示例(使用FastAPI部署情感分析模型):
# 安装必要的库: pip install fastapi uvicorn transformers torch pydantic
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from typing import List, Dict, Any
# 初始化FastAPI应用
app = FastAPI(title="情感分析API", description="提供文本情感分析服务")
# 定义请求和响应模型
class SentimentRequest(BaseModel):
texts: List[str]
model: str = "bert-base-uncased-finetuned-sst-2-english"
class SentimentResponse(BaseModel):
results: List[Dict[str, Any]]
# 全局变量存储模型和分词器
models = {}
# 加载模型函数
def load_model(model_name: str):
if model_name not in models:
try:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.to(device)
model.eval()
models[model_name] = {'model': model, 'tokenizer': tokenizer, 'device': device}
except Exception as e:
raise HTTPException(status_code=500, detail=f"加载模型失败: {str(e)}")
return models[model_name]
# 定义API端点
@app.post("/analyze-sentiment", response_model=SentimentResponse)
async def analyze_sentiment(request: SentimentRequest):
try:
# 加载模型
model_info = load_model(request.model)
model = model_info['model']
tokenizer = model_info['tokenizer']
device = model_info['device']
# 分析情感
results = []
for text in request.texts:
# 预处理文本
inputs = tokenizer(text, truncation=True, padding=True, max_length=128, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
# 推理
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
# 映射情感标签 (假设模型使用0:消极, 1:积极)
sentiment = "积极" if predicted_class == 1 else "消极"
confidence = probabilities[0, predicted_class].item()
results.append({
'text': text,
'sentiment': sentiment,
'confidence': float(confidence),
'probabilities': probabilities.cpu().numpy()[0].tolist()
})
return SentimentResponse(results=results)
except Exception as e:
raise HTTPException(status_code=500, detail=f"分析过程中出错: {str(e)}")
# 健康检查端点
@app.get("/health")
async def health_check():
return {"status": "healthy", "models_loaded": list(models.keys())}
# 启动服务器命令: uvicorn app:app --host 0.0.0.0 --port 8000
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)模型压缩技术:
性能优化技术:
Python实现示例(使用ONNX进行模型加速):
# 安装必要的库: pip install transformers torch onnx onnxruntime
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import onnx
import onnxruntime as ort
import numpy as np
import time
def optimize_model_with_onnx():
# 加载预训练模型和分词器
model_name = "bert-base-uncased-finetuned-sst-2-english"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 确保模型在评估模式
model.eval()
# 创建示例输入
dummy_input = tokenizer(
"This is a sample text for ONNX conversion.",
truncation=True,
padding=True,
max_length=128,
return_tensors="pt"
)
# 导出为ONNX格式
onnx_path = "sentiment_analysis_model.onnx"
torch.onnx.export(
model,
(
dummy_input['input_ids'],
dummy_input['attention_mask'],
dummy_input.get('token_type_ids', None)
),
onnx_path,
export_params=True,
opset_version=12,
do_constant_folding=True,
input_names=['input_ids', 'attention_mask', 'token_type_ids'],
output_names=['logits'],
dynamic_axes={
'input_ids': {0: 'batch_size', 1: 'sequence_length'},
'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
'token_type_ids': {0: 'batch_size', 1: 'sequence_length'},
'logits': {0: 'batch_size'}
}
)
# 验证ONNX模型
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)
print(f"ONNX模型导出成功: {onnx_path}")
# 创建ONNX Runtime推理会话
ort_session = ort.InferenceSession(onnx_path)
# 定义推理函数
def torch_inference(texts, model, tokenizer):
results = []
for text in texts:
inputs = tokenizer(text, truncation=True, padding=True, max_length=128, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=1)
predicted_class = torch.argmax(probabilities, dim=1).item()
sentiment = "积极" if predicted_class == 1 else "消极"
results.append((sentiment, probabilities[0, predicted_class].item()))
return results
def onnx_inference(texts, session, tokenizer):
results = []
for text in texts:
inputs = tokenizer(text, truncation=True, padding=True, max_length=128, return_tensors="np")
ort_inputs = {
'input_ids': inputs['input_ids'],
'attention_mask': inputs['attention_mask']
}
# 添加token_type_ids(如果存在)
if 'token_type_ids' in inputs:
ort_inputs['token_type_ids'] = inputs['token_type_ids']
ort_outs = session.run(['logits'], ort_inputs)
logits = ort_outs[0]
probabilities = np.exp(logits) / np.sum(np.exp(logits), axis=1, keepdims=True)
predicted_class = np.argmax(probabilities, axis=1).item()
sentiment = "积极" if predicted_class == 1 else "消极"
results.append((sentiment, probabilities[0, predicted_class].item()))
return results
# 测试文本
test_texts = [
"I love this product, it's amazing!",
"This is terrible, I want a refund.",
"The service was okay, not great but not terrible either.",
"I'm very satisfied with my purchase and will definitely buy again."
]
# 测量PyTorch推理时间
torch_start = time.time()
torch_results = torch_inference(test_texts, model, tokenizer)
torch_end = time.time()
torch_time = torch_end - torch_start
# 测量ONNX推理时间
onnx_start = time.time()
onnx_results = onnx_inference(test_texts, ort_session, tokenizer)
onnx_end = time.time()
onnx_time = onnx_end - onnx_start
# 比较结果
print("\n性能比较:")
print(f"PyTorch推理时间: {torch_time:.4f}秒")
print(f"ONNX推理时间: {onnx_time:.4f}秒")
print(f"加速比: {torch_time / onnx_time:.2f}倍")
# 验证结果一致性
print("\n结果一致性验证:")
for i, (torch_res, onnx_res) in enumerate(zip(torch_results, onnx_results)):
print(f"文本 {i+1}:")
print(f" PyTorch: 情感={torch_res[0]}, 置信度={torch_res[1]:.4f}")
print(f" ONNX: 情感={onnx_res[0]}, 置信度={onnx_res[1]:.4f}")
print(f" 一致性: {'✓' if torch_res[0] == onnx_res[0] else '✗'}")
print("\nONNX优化总结:")
print("1. 模型推理速度显著提升")
print("2. 可以在多种硬件和平台上部署")
print("3. 减少了对PyTorch的依赖")
print("4. 适用于生产环境部署")
# 运行示例
try:
optimize_model_with_onnx()
except ImportError as e:
print(f"请安装必要的库: {e}")
print("\n安装命令: pip install transformers torch onnx onnxruntime")
except Exception as e:
print(f"运行时错误: {e}")CI/CD流程设计:
工具与平台推荐:
最佳实践:
模型架构演进:
学习范式创新:
任务定义扩展:
新兴应用领域:
主要挑战:
发展机遇:
基于任务类型选择:
任务类型 | 推荐模型 | 优势 | 适用场景 |
|---|---|---|---|
基础极性分类 | BERT、RoBERTa、DistilBERT | 准确率高,易于实现 | 产品评论分析、社交媒体监控 |
细粒度情感分析 | ATAE-LSTM、IAN、BERT-ABSA | 能够识别方面级情感 | 产品改进、客户满意度分析 |
情绪检测 | RoBERTa-emotion、BERT-emotion | 能够识别具体情绪类型 | 心理健康监测、内容推荐 |
多语言情感分析 | XLM-RoBERTa、mBERT | 支持多种语言 | 全球化业务、国际舆情监测 |
多模态情感分析 | VisualBERT、ViLBERT | 结合多种模态信息 | 社交媒体内容分析、视频内容分析 |
基于资源限制选择:
数据收集与标注:
数据预处理:
数据增强:
训练策略:
评估方法:
常见问题排查:
问题 | 可能原因 | 解决方案 |
|---|---|---|
训练集性能好,测试集性能差 | 过拟合 | 增加正则化、使用Dropout、数据增强 |
模型对否定词处理不好 | 上下文理解不足 | 使用双向模型、增加否定词相关训练数据 |
模型在新领域表现差 | 领域适应问题 | 进行领域适应微调、使用领域特定数据 |
多语言模型在某些语言上表现差 | 数据不均衡 | 增加低资源语言的训练数据、针对性微调 |
部署架构:
监控与维护:
成本优化:
通过本文的详细介绍,相信读者已经对情感分析的各种变体、技术实现和应用场景有了全面的了解。在实际应用中,应根据具体需求选择合适的模型和方法,并不断优化和改进,以获得最佳的效果。随着人工智能技术的不断发展,情感分析将在更多领域发挥重要作用,为企业和社会创造更大的价值。“,”}}}“,”}}}