Moved connect limits per service to AIPerServiceRequestQueue.

Added AIPerServiceRequestQueue::mConcurrectConnections and
AIPerServiceRequestQueue::mMaxPipelinedRequests.
This commit is contained in:
Aleric Inglewood
2013-04-26 19:13:18 +02:00
parent 304e2b4958
commit 6c1335af50
3 changed files with 53 additions and 30 deletions

View File

@@ -39,6 +39,7 @@
#include "sys.h"
#include "aicurlperservice.h"
#include "aicurlthread.h"
#include "llcontrol.h"
AIPerServiceRequestQueue::threadsafe_instance_map_type AIPerServiceRequestQueue::sInstanceMap;
LLAtomicS32 AIPerServiceRequestQueue::sTotalQueued;
@@ -50,7 +51,8 @@ bool AIPerServiceRequestQueue::sRequestStarvation;
namespace AICurlPrivate {
U32 curl_concurrent_connections_per_service;
// Cached value of CurlConcurrentConnectionsPerService.
U32 CurlConcurrentConnectionsPerService;
// Friend functions of RefCountedThreadSafePerServiceRequestQueue
@@ -71,6 +73,14 @@ void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_servi
using namespace AICurlPrivate;
AIPerServiceRequestQueue::AIPerServiceRequestQueue(void) :
mQueuedCommands(0), mAdded(0), mQueueEmpty(false),
mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms.
mConcurrectConnections(CurlConcurrentConnectionsPerService),
mMaxPipelinedRequests(CurlConcurrentConnectionsPerService)
{
}
// url must be of the form
// (see http://www.ietf.org/rfc/rfc3986.txt Appendix A for definitions not given here):
//
@@ -220,13 +230,11 @@ void AIPerServiceRequestQueue::release(AIPerServiceRequestQueuePtr& instance)
bool AIPerServiceRequestQueue::throttled() const
{
llassert(mAdded <= int(curl_concurrent_connections_per_service));
return mAdded == int(curl_concurrent_connections_per_service);
return mAdded >= mConcurrectConnections;
}
void AIPerServiceRequestQueue::added_to_multi_handle(void)
{
llassert(mAdded < int(curl_concurrent_connections_per_service));
++mAdded;
}
@@ -323,3 +331,17 @@ void AIPerServiceRequestQueue::purge(void)
}
}
//static
void AIPerServiceRequestQueue::adjust_concurrent_connections(int increment)
{
instance_map_wat instance_map_w(sInstanceMap);
for (AIPerServiceRequestQueue::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter)
{
PerServiceRequestQueue_wat per_service_w(*iter->second);
U32 old_concurrent_connections = per_service_w->mConcurrectConnections;
per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService);
increment = per_service_w->mConcurrectConnections - old_concurrent_connections;
per_service_w->mMaxPipelinedRequests = llmax(per_service_w->mMaxPipelinedRequests + increment, 0);
}
}

View File

@@ -90,7 +90,7 @@ class AIPerServiceRequestQueue {
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerServiceRequestQueue instances with the hostname as key.
friend class AIThreadSafeSimpleDC<AIPerServiceRequestQueue>; //threadsafe_PerServiceRequestQueue
AIPerServiceRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25) { } // 25 = 1000 ms / 40 ms.
AIPerServiceRequestQueue(void);
public:
typedef instance_map_type::iterator iterator;
@@ -125,7 +125,9 @@ class AIPerServiceRequestQueue {
static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue.
static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero.
AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second.
AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second.
int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service.
int mMaxPipelinedRequests; // The maximum number of accepted requests that didn't finish yet.
public:
void added_to_command_queue(void) { ++mQueuedCommands; }
@@ -147,6 +149,9 @@ class AIPerServiceRequestQueue {
AIAverage& bandwidth(void) { return mHTTPBandwidth; }
AIAverage const& bandwidth(void) const { return mHTTPBandwidth; }
// Called when CurlConcurrentConnectionsPerService changes.
static void adjust_concurrent_connections(int increment);
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
@@ -172,7 +177,7 @@ class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceR
friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p);
};
extern U32 curl_concurrent_connections_per_service;
extern U32 CurlConcurrentConnectionsPerService;
} // namespace AICurlPrivate

View File

@@ -207,7 +207,6 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable)
namespace AICurlPrivate {
LLAtomicS32 max_pipelined_requests(32);
LLAtomicS32 max_pipelined_requests_per_service(8);
enum command_st {
cmd_none,
@@ -2510,10 +2509,9 @@ void startCurlThread(LLControlGroup* control_group)
// Cache Debug Settings.
sConfigGroup = control_group;
curl_max_total_concurrent_connections = sConfigGroup->getU32("CurlMaxTotalConcurrentConnections");
curl_concurrent_connections_per_service = sConfigGroup->getU32("CurlConcurrentConnectionsPerService");
CurlConcurrentConnectionsPerService = sConfigGroup->getU32("CurlConcurrentConnectionsPerService");
gNoVerifySSLCert = sConfigGroup->getBOOL("NoVerifySSLCert");
max_pipelined_requests = curl_max_total_concurrent_connections;
max_pipelined_requests_per_service = curl_concurrent_connections_per_service;
AICurlThread::sInstance = new AICurlThread;
AICurlThread::sInstance->start();
@@ -2535,10 +2533,10 @@ bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue)
{
using namespace AICurlPrivate;
U32 old = curl_concurrent_connections_per_service;
curl_concurrent_connections_per_service = newvalue.asInteger();
max_pipelined_requests_per_service += curl_concurrent_connections_per_service - old;
llinfos << "CurlConcurrentConnectionsPerService set to " << curl_concurrent_connections_per_service << llendl;
U32 new_concurrent_connections = newvalue.asInteger();
AIPerServiceRequestQueue::adjust_concurrent_connections(new_concurrent_connections - CurlConcurrentConnectionsPerService);
CurlConcurrentConnectionsPerService = new_concurrent_connections;
llinfos << "CurlConcurrentConnectionsPerService set to " << CurlConcurrentConnectionsPerService << llendl;
return true;
}
@@ -2630,34 +2628,32 @@ bool AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(AIPerServiceRequestQueue
AIAverage* http_bandwidth_ptr;
// Atomic read max_pipelined_requests_per_service for the below calculations.
S32 const max_pipelined_requests_per_service_cache = max_pipelined_requests_per_service;
{
PerServiceRequestQueue_wat per_service_w(*per_service);
S32 const pipelined_requests_per_service = per_service_w->pipelined_requests();
reject = pipelined_requests_per_service >= max_pipelined_requests_per_service_cache;
equal = pipelined_requests_per_service == max_pipelined_requests_per_service_cache;
reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests;
equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests;
increment_threshold = per_service_w->mRequestStarvation;
decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty;
// Reset flags.
per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false;
// Grab per service bandwidth object.
http_bandwidth_ptr = &per_service_w->bandwidth();
}
if (decrement_threshold)
{
if (max_pipelined_requests_per_service_cache > (S32)curl_concurrent_connections_per_service)
if (decrement_threshold)
{
--max_pipelined_requests_per_service;
if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections)
{
per_service_w->mMaxPipelinedRequests--;
}
}
}
else if (increment_threshold && reject)
{
if (max_pipelined_requests_per_service_cache < 2 * (S32)curl_concurrent_connections_per_service)
else if (increment_threshold && reject)
{
max_pipelined_requests_per_service++;
// Immediately take the new threshold into account.
reject = !equal;
if (per_service_w->mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections)
{
per_service_w->mMaxPipelinedRequests++;
// Immediately take the new threshold into account.
reject = !equal;
}
}
}
if (reject)