我必须在我的公司的CRM解决方案(Oracle的)中查询我们的600k用户,如果他们存在,则在那里更新他们,或者在他们不存在的情况下创建他们。对于60万用户,这可能是一个真正的痛苦,因为每次获得响应所需的时间(大约1秒)。因此,我设法将代码更改为使用Parallel.ForEach,在0,35秒内查询每条记录,并将其添加到要创建或要更新的记录的List<User>中(现在有点愚蠢,所以我需要将它们分隔在两个列表中,并调用两个不同的WS方法)。
在多线程之前,我的代码运行得很好,但时间太长了。问题是我不能让批处理太大,或者在尝试通过Web服务进行更新或创建时超时。所以我一次向他们发送大约500条记录,当它运行关键代码部分时,它会执行很多次。
Parallel.ForEach(boDS.USERS.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = -1 }, row =>
{
...
user = null;
user = QueryUserById(row["USER_ID"].Trim());
if (user == null)
{
isUpdate = false;
gObject.ID = new ID();
}
else
{
isUpdate = true;
gObject.ID = user.ID;
}
... fill user attributes as generic fields ...
gObject.GenericFields = listGenericFields.ToArray();
if (isUpdate)
listUserUpdate.Add(gObject);
else
listUserCreate.Add(gObject);
if (i == batchSize - 1 || i == (boDS.USERS.Rows.Count - 1))
{
UpdateProcessingOptions upo = new UpdateProcessingOptions();
CreateProcessingOptions cpo = new CreateProcessingOptions();
upo.SuppressExternalEvents = false;
upo.SuppressRules = false;
cpo.SuppressExternalEvents = false;
cpo.SuppressRules = false;
RNObject[] results = null;
// <Critical_code>
if (listUserCreate.Count > 0)
{
results = _service.Create(_clientInfoHeader, listUserCreate.ToArray(), cpo);
}
if (listUserUpdate.Count > 0)
{
_service.Update(_clientInfoHeader, listUserUpdate.ToArray(), upo);
}
// </Critical_code>
listUserUpdate = new List<RNObject>();
listUserCreate = new List<RNObject>();
}
i++;
});我考虑过使用lock或mutex,但这对我没有帮助,因为它们只会等待执行。我需要一些解决方案,只执行一次,只在一个线程的那部分代码。有可能吗?有谁能分享一些光吗?
谢谢,向你致以亲切的问候,莱安德罗
发布于 2014-07-24 06:29:44
您可以使用Double-checked locking模式。这通常用于单例,但是您在这里没有创建单例,所以像Lazy<T>这样的通用单例不适用。
它是这样工作的:
对外部类执行class QuerySharedData { // All the write-once-read-many fields that need to be shared between threads public QuerySharedData() { // Compute all the write-once-read-many fields. Or use a static Create method if that's handy. } }
object padlock; volatile QuerySharedData data
if (data == null) { lock (padlock) { if (data == null) { data = new QuerySharedData(); // this does all the work to initialize the shared fields } } } var localData = data
然后,通过将共享查询数据分组到一个从属类中来使用localData中的共享查询数据,从而避免了使其各个字段不稳定的必要性。
有关volatile的更多信息,请访问:Part 4: Advanced Threading。
更新我在这里的假设是,QuerySharedData持有的所有类和字段在初始化后都是只读的。如果这不是真的,例如,如果你初始化一个列表一次,但在许多线程中添加到它,这个模式将不会为你工作。你将不得不考虑使用像Thread-Safe Collections这样的东西。
发布于 2014-07-24 06:34:32
正如您在注释中所述,您在循环体之外声明了变量。这就是你的竞争条件的来源。
让我们以变量listUserUpdate为例。它由并行执行的线程随机访问。当一个线程仍在添加时,例如在listUserUpdate.Add(gObject);中,另一个线程可能已经在listUserUpdate = new List<RNObject>();中重置列表或在listUserUpdate.ToArray()中枚举列表。
您确实需要将该代码重构为
以同步的方式访问数据,从而使每个循环尽可能独立地运行
https://stackoverflow.com/questions/24921380
复制相似问题