diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index e50dac4c9..533111237 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -39,6 +39,7 @@ #include "sys.h" #include "aicurlperservice.h" #include "aicurlthread.h" +#include "llcontrol.h" AIPerServiceRequestQueue::threadsafe_instance_map_type AIPerServiceRequestQueue::sInstanceMap; LLAtomicS32 AIPerServiceRequestQueue::sTotalQueued; @@ -50,7 +51,8 @@ bool AIPerServiceRequestQueue::sRequestStarvation; namespace AICurlPrivate { -U32 curl_concurrent_connections_per_service; +// Cached value of CurlConcurrentConnectionsPerService. +U32 CurlConcurrentConnectionsPerService; // Friend functions of RefCountedThreadSafePerServiceRequestQueue @@ -71,6 +73,14 @@ void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_servi using namespace AICurlPrivate; +AIPerServiceRequestQueue::AIPerServiceRequestQueue(void) : + mQueuedCommands(0), mAdded(0), mQueueEmpty(false), + mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. + mConcurrectConnections(CurlConcurrentConnectionsPerService), + mMaxPipelinedRequests(CurlConcurrentConnectionsPerService) +{ +} + // url must be of the form // (see http://www.ietf.org/rfc/rfc3986.txt Appendix A for definitions not given here): // @@ -220,13 +230,11 @@ void AIPerServiceRequestQueue::release(AIPerServiceRequestQueuePtr& instance) bool AIPerServiceRequestQueue::throttled() const { - llassert(mAdded <= int(curl_concurrent_connections_per_service)); - return mAdded == int(curl_concurrent_connections_per_service); + return mAdded >= mConcurrectConnections; } void AIPerServiceRequestQueue::added_to_multi_handle(void) { - llassert(mAdded < int(curl_concurrent_connections_per_service)); ++mAdded; } @@ -323,3 +331,17 @@ void AIPerServiceRequestQueue::purge(void) } } +//static +void AIPerServiceRequestQueue::adjust_concurrent_connections(int increment) +{ + instance_map_wat instance_map_w(sInstanceMap); + for (AIPerServiceRequestQueue::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter) + { + PerServiceRequestQueue_wat per_service_w(*iter->second); + U32 old_concurrent_connections = per_service_w->mConcurrectConnections; + per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService); + increment = per_service_w->mConcurrectConnections - old_concurrent_connections; + per_service_w->mMaxPipelinedRequests = llmax(per_service_w->mMaxPipelinedRequests + increment, 0); + } +} + diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 8cd310f40..839aa5125 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -90,7 +90,7 @@ class AIPerServiceRequestQueue { static threadsafe_instance_map_type sInstanceMap; // Map of AIPerServiceRequestQueue instances with the hostname as key. friend class AIThreadSafeSimpleDC; //threadsafe_PerServiceRequestQueue - AIPerServiceRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25) { } // 25 = 1000 ms / 40 ms. + AIPerServiceRequestQueue(void); public: typedef instance_map_type::iterator iterator; @@ -125,7 +125,9 @@ class AIPerServiceRequestQueue { static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue. static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero. - AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. + AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. + int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. + int mMaxPipelinedRequests; // The maximum number of accepted requests that didn't finish yet. public: void added_to_command_queue(void) { ++mQueuedCommands; } @@ -147,6 +149,9 @@ class AIPerServiceRequestQueue { AIAverage& bandwidth(void) { return mHTTPBandwidth; } AIAverage const& bandwidth(void) const { return mHTTPBandwidth; } + // Called when CurlConcurrentConnectionsPerService changes. + static void adjust_concurrent_connections(int increment); + // Returns true if curl can handle another request for this host. // Should return false if the maximum allowed HTTP bandwidth is reached, or when // the latency between request and actual delivery becomes too large. @@ -172,7 +177,7 @@ class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceR friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p); }; -extern U32 curl_concurrent_connections_per_service; +extern U32 CurlConcurrentConnectionsPerService; } // namespace AICurlPrivate diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index f8aad0887..9e9321e41 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -207,7 +207,6 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable) namespace AICurlPrivate { LLAtomicS32 max_pipelined_requests(32); -LLAtomicS32 max_pipelined_requests_per_service(8); enum command_st { cmd_none, @@ -2510,10 +2509,9 @@ void startCurlThread(LLControlGroup* control_group) // Cache Debug Settings. sConfigGroup = control_group; curl_max_total_concurrent_connections = sConfigGroup->getU32("CurlMaxTotalConcurrentConnections"); - curl_concurrent_connections_per_service = sConfigGroup->getU32("CurlConcurrentConnectionsPerService"); + CurlConcurrentConnectionsPerService = sConfigGroup->getU32("CurlConcurrentConnectionsPerService"); gNoVerifySSLCert = sConfigGroup->getBOOL("NoVerifySSLCert"); max_pipelined_requests = curl_max_total_concurrent_connections; - max_pipelined_requests_per_service = curl_concurrent_connections_per_service; AICurlThread::sInstance = new AICurlThread; AICurlThread::sInstance->start(); @@ -2535,10 +2533,10 @@ bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue) { using namespace AICurlPrivate; - U32 old = curl_concurrent_connections_per_service; - curl_concurrent_connections_per_service = newvalue.asInteger(); - max_pipelined_requests_per_service += curl_concurrent_connections_per_service - old; - llinfos << "CurlConcurrentConnectionsPerService set to " << curl_concurrent_connections_per_service << llendl; + U32 new_concurrent_connections = newvalue.asInteger(); + AIPerServiceRequestQueue::adjust_concurrent_connections(new_concurrent_connections - CurlConcurrentConnectionsPerService); + CurlConcurrentConnectionsPerService = new_concurrent_connections; + llinfos << "CurlConcurrentConnectionsPerService set to " << CurlConcurrentConnectionsPerService << llendl; return true; } @@ -2630,34 +2628,32 @@ bool AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(AIPerServiceRequestQueue AIAverage* http_bandwidth_ptr; - // Atomic read max_pipelined_requests_per_service for the below calculations. - S32 const max_pipelined_requests_per_service_cache = max_pipelined_requests_per_service; { PerServiceRequestQueue_wat per_service_w(*per_service); S32 const pipelined_requests_per_service = per_service_w->pipelined_requests(); - reject = pipelined_requests_per_service >= max_pipelined_requests_per_service_cache; - equal = pipelined_requests_per_service == max_pipelined_requests_per_service_cache; + reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests; + equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests; increment_threshold = per_service_w->mRequestStarvation; decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty; // Reset flags. per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false; // Grab per service bandwidth object. http_bandwidth_ptr = &per_service_w->bandwidth(); - } - if (decrement_threshold) - { - if (max_pipelined_requests_per_service_cache > (S32)curl_concurrent_connections_per_service) + if (decrement_threshold) { - --max_pipelined_requests_per_service; + if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections) + { + per_service_w->mMaxPipelinedRequests--; + } } - } - else if (increment_threshold && reject) - { - if (max_pipelined_requests_per_service_cache < 2 * (S32)curl_concurrent_connections_per_service) + else if (increment_threshold && reject) { - max_pipelined_requests_per_service++; - // Immediately take the new threshold into account. - reject = !equal; + if (per_service_w->mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections) + { + per_service_w->mMaxPipelinedRequests++; + // Immediately take the new threshold into account. + reject = !equal; + } } } if (reject)