引言
我正在为SE API 2.0构建一个API包装器
目前,我正在实现一个缓存特性,这直到现在才是一个问题。现在我考虑的是并发性。这将是我的测试方法:
代码
public static void TestConcurrency()
{
Stopwatch sw = new Stopwatch();
sw.Start();
IList<Task> tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(Task.Factory.StartNew(p => client.GetAnswers(), null));
}
Task.WaitAll(tasks.ToArray());
sw.Stop();
Console.WriteLine("elapsed: {0}", sw.Elapsed.ToString());
Console.ReadKey();
}描述
在内部,客户机有一个RequestHandler类,它试图从缓存中获取一个值,如果没有这样做,它将执行实际的请求。
代码
/// <summary>
/// Checks the cache and then performs the actual request, if required.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> InternalProcessing<T>(string endpoint) where T : class
{
IApiResponse<T> result = FetchFromCache<T>(endpoint);
return result ?? PerformRequest<T>(endpoint);
}描述
实际执行请求的代码与此问题无关。试图访问缓存的代码执行以下操作:
代码
/// <summary>
/// Attempts to fetch the response object from the cache instead of directly from the API.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> FetchFromCache<T>(string endpoint) where T : class
{
IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
IApiResponse<T> result = cacheItem.Response;
result.Source = ResultSourceEnum.Cache;
return result;
}
return null;
}描述
缓存存储的实际实现在ConcurrentDictionary上工作,当调用Get<T>()方法时,即:
endpoint.
Processing,线程将被休眠一小段时间,等待实际请求为completed.
的条目,则将null作为endpoint的响应推送到缓存中,发出处理该端点上的请求的信号,并且不需要在同一端点上发出更多请求。然后返回null,发出发出实际请求的信号。
代码
/// <summary>
/// Attempts to access the internal cache and retrieve a response cache item without querying the API.
/// <para>If the endpoint is not present in the cache yet, null is returned, but the endpoint is added to the cache.</para>
/// <para>If the endpoint is present, it means the request is being processed. In this case we will wait on the processing to end before returning a result.</para>
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The API endpoint</param>
/// <returns>Returns an API response cache item if successful, null otherwise.</returns>
public IApiResponseCacheItem<T> Get<T>(string endpoint) where T : class
{
IApiResponseCacheItem cacheItem;
if (Cache.TryGetValue(endpoint, out cacheItem))
{
while (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Processing)
{
Thread.Sleep(10);
}
if (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Cached)
{
return (IApiResponseCacheItem<T>)cacheItem;
}
IApiResponseCacheItem value;
Cache.TryRemove(endpoint, out value);
}
Push<T>(endpoint, null);
return null;
}这个问题是不确定的,有时有两个请求使它通过,而不是仅仅一个像它设计的那样发生。
我在想,一路上有一些不是线程安全的东西正在被访问。但我不知道那可能是什么。它可能是什么,或者我应该如何正确地调试它?
更新
问题是我在ConcurrentDictionary上并不总是线程安全
此方法没有重新调整指示缓存是否已成功更新的boolean,因此,如果此方法失败,null将被Get<T>()返回两次。
代码
/// <summary>
/// Attempts to push API responses into the cache store.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The queried API endpoint.</param>
/// <param name="response">The API response.</param>
/// <returns>True if the operation was successful, false otherwise.</returns>
public bool Push<T>(string endpoint, IApiResponse<T> response) where T : class
{
if (endpoint.NullOrEmpty())
{
return false;
}
IApiResponseCacheItem item;
if (Cache.TryGetValue(endpoint, out item))
{
((IApiResponseCacheItem<T>)item).UpdateResponse(response);
return true;
}
else
{
item = new ApiResponseCacheItem<T>(response);
return Cache.TryAdd(endpoint, item);
}
}描述
解决方案是实现返回值,并更改添加以下内容的Get<T>():
代码
if (Push<T>(endpoint, null) || retries > 1) // max retries for sanity.
{
return null;
}
else
{
return Get<T>(endpoint, ++retries); // retry push.
}发布于 2012-01-21 21:17:20
IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
// etc..
}ConcurrentDirectionary是线程安全的,但这不会自动使代码线程安全。上面的片段是问题的核心。两个线程可以同时调用Get()方法并获得null。它们将同时继续并调用PerformRequest()。您需要合并InternalProcessing()和FetchFromCache(),并确保只有一个线程可以使用锁调用PerformRequest。这可能会产生很差的并发性,也许您可以删除一个重复的响应。无论如何,这些请求都可能是由SE服务器序列化的,所以可能并不重要。
https://stackoverflow.com/questions/8956286
复制相似问题