diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 61693d91f..a0493f1a3 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -72,10 +72,9 @@ using namespace AICurlPrivate; AIPerService::AIPerService(void) : mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. - mConcurrectConnections(CurlConcurrentConnectionsPerService), + mConcurrentConnections(CurlConcurrentConnectionsPerService), + mApprovedRequests(0), mTotalAdded(0), - mApprovedFirst(0), - mUnapprovedFirst(0), mUsedCT(0), mCTInUse(0) { @@ -88,7 +87,7 @@ AIPerService::CapabilityType::CapabilityType(void) : mFlags(0), mDownloading(0), mMaxPipelinedRequests(CurlConcurrentConnectionsPerService), - mConcurrectConnections(CurlConcurrentConnectionsPerService) + mConcurrentConnections(CurlConcurrentConnectionsPerService) { } @@ -298,20 +297,20 @@ void AIPerService::redivide_connections(void) else { // Give every other type (that is not in use) one connection, so they can be used (at which point they'll get more). - mCapabilityType[order[i]].mConcurrectConnections = 1; + mCapabilityType[order[i]].mConcurrentConnections = 1; } } // Keep one connection in reserve for currently unused capability types (that have been used before). int reserve = (mUsedCT != mCTInUse) ? 1 : 0; - // Distribute (mConcurrectConnections - reserve) over number_of_capability_types_in_use. - U16 max_connections_per_CT = (mConcurrectConnections - reserve) / number_of_capability_types_in_use + 1; + // Distribute (mConcurrentConnections - reserve) over number_of_capability_types_in_use. + U16 max_connections_per_CT = (mConcurrentConnections - reserve) / number_of_capability_types_in_use + 1; // The first count CTs get max_connections_per_CT connections. - int count = (mConcurrectConnections - reserve) % number_of_capability_types_in_use; + int count = (mConcurrentConnections - reserve) % number_of_capability_types_in_use; for(int i = 1, j = 0;; --i) { while (j < count) { - mCapabilityType[used_order[j++]].mConcurrectConnections = max_connections_per_CT; + mCapabilityType[used_order[j++]].mConcurrentConnections = max_connections_per_CT; } if (i == 0) { @@ -322,7 +321,7 @@ void AIPerService::redivide_connections(void) // Never assign 0 as maximum. if (max_connections_per_CT > 1) { - // The remaining CTs get one connection less so that the sum of all assigned connections is mConcurrectConnections - reserve. + // The remaining CTs get one connection less so that the sum of all assigned connections is mConcurrentConnections - reserve. --max_connections_per_CT; } } @@ -330,8 +329,8 @@ void AIPerService::redivide_connections(void) bool AIPerService::throttled(AICapabilityType capability_type) const { - return mTotalAdded >= mConcurrectConnections || - mCapabilityType[capability_type].mAdded >= mCapabilityType[capability_type].mConcurrectConnections; + return mTotalAdded >= mConcurrentConnections || + mCapabilityType[capability_type].mAdded >= mCapabilityType[capability_type].mConcurrentConnections; } void AIPerService::added_to_multi_handle(AICapabilityType capability_type) @@ -365,7 +364,10 @@ bool AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType if (needs_queuing) { queued_requests.push_back(easy_request.get_ptr()); - TotalQueued_wat(sTotalQueued)->count++; + if (is_approved(capability_type)) + { + TotalQueued_wat(sTotalQueued)->approved++; + } } return needs_queuing; } @@ -391,105 +393,139 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityTyp prev = cur; } mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe. - TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_w->count--; - llassert(total_queued_w->count >= 0); + if (is_approved(capability_type)) + { + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->approved > 0); + total_queued_w->approved--; + } return true; } -void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool recursive) +void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool only_this_service) { - int order[number_of_capability_types]; - // The first two types are approved types, they should be the first to try. - // Try the one that has the largest queue first, if they the queues have equal size, try mApprovedFirst first. - size_t s0 = mCapabilityType[0].mQueuedRequests.size(); - size_t s1 = mCapabilityType[1].mQueuedRequests.size(); - if (s0 == s1) + U32 success = 0; // The CTs that we successfully added a request for from the queue. + bool success_this_pass = false; + int i = 0; + // The first pass we only look at CTs with 0 requests added to the multi handle. Subsequent passes only non-zero ones. + for (int pass = 0;; ++i) { - order[0] = mApprovedFirst; - mApprovedFirst = 1 - mApprovedFirst; - order[1] = mApprovedFirst; - } - else if (s0 > s1) - { - order[0] = 0; - order[1] = 1; - } - else - { - order[0] = 1; - order[1] = 0; - } - // The next two types are unapproved types. Here, try them alternating regardless of queue size. - int n = mUnapprovedFirst; - for (int i = 2; i < number_of_capability_types; ++i, n = (n + 1) % (number_of_capability_types - 2)) - { - order[i] = 2 + n; - } - mUnapprovedFirst = (mUnapprovedFirst + 1) % (number_of_capability_types - 2); - - for (int i = 0; i < number_of_capability_types; ++i) - { - CapabilityType& ct(mCapabilityType[order[i]]); - if (!ct.mQueuedRequests.empty()) + if (i == number_of_capability_types) { - if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true)) + i = 0; + // Keep trying until we couldn't add anything anymore. + if (pass++ && !success_this_pass) { - // Throttled. If this failed then every capability type will fail: we either are using too much bandwidth, or too many total connections. - // However, it MAY be that this service was thottled for using too much bandwidth by itself. Look if other services can be added. + // Done. break; } - // Request was added, remove it from the queue. - ct.mQueuedRequests.pop_front(); - if (ct.mQueuedRequests.empty()) - { - // We obtained a request from the queue, and after that there we no more request in the queue of this service. - ct.mFlags |= ctf_empty; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in the queue of this service. - ct.mFlags |= ctf_full; - } - TotalQueued_wat total_queued_w(sTotalQueued); - llassert(total_queued_w->count > 0); - if (!--(total_queued_w->count)) - { - // We obtained a request from the queue, and after that there we no more request in any queue. - total_queued_w->empty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. - total_queued_w->full = true; - } - // We added something from a queue, so we're done. - return; + success_this_pass = false; } - else + CapabilityType& ct(mCapabilityType[i]); + if (!pass != !ct.mAdded) // Does mAdded match what we're looking for (first mAdded == 0, then mAdded != 0)? + { + continue; + } + if (multi_handle->added_maximum()) + { + // We hit the maximum number of global connections. Abort every attempt to add anything. + only_this_service = true; + break; + } + if (mTotalAdded >= mConcurrentConnections) + { + // We hit the maximum number of connections for this service. Abort any attempt to add anything to this service. + break; + } + if (ct.mAdded >= ct.mConcurrentConnections) + { + // We hit the maximum number of connections for this capability type. Try the next one. + continue; + } + U32 mask = CT2mask((AICapabilityType)i); + if (ct.mQueuedRequests.empty()) // Is there anything in the queue (left) at all? { // We could add a new request, but there is none in the queue! // Note that if this service does not serve this capability type, // then obviously this queue was empty; however, in that case // this variable will never be looked at, so it's ok to set it. - ct.mFlags |= ctf_starvation; + ct.mFlags |= ((success & mask) ? ctf_empty : ctf_starvation); } - if (i == number_of_capability_types - 1) + else { - // Last entry also empty. All queues of this service were empty. Check total connections. - TotalQueued_wat total_queued_w(sTotalQueued); - if (total_queued_w->count == 0) + // Attempt to add the front of the queue. + if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true)) { - // The queue of every service is empty! - total_queued_w->starvation = true; - return; + // If that failed then we got throttled on bandwidth because the maximum number of connections were not reached yet. + // Therefore this will keep failing for this service, we abort any additional attempt to add something for this service. + break; + } + // Request was added, remove it from the queue. + ct.mQueuedRequests.pop_front(); + // Mark that at least one request of this CT was successfully added. + success |= mask; + success_this_pass = true; + // Update approved count. + if (is_approved((AICapabilityType)i)) + { + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->approved > 0); + total_queued_w->approved--; } } } - if (recursive) + + size_t queuedapproved_size = 0; + for (int i = 0; i < number_of_capability_types; ++i) + { + CapabilityType& ct(mCapabilityType[i]); + U32 mask = CT2mask((AICapabilityType)i); + // Add up the size of all queues with approved requests. + if ((approved_mask & mask)) + { + queuedapproved_size += ct.mQueuedRequests.size(); + } + // Skip CTs that we didn't add anything for. + if (!(success & mask)) + { + continue; + } + if (!ct.mQueuedRequests.empty()) + { + // We obtained one or more requests from the queue, and even after that there was at least one more request in the queue of this CT. + ct.mFlags |= ctf_full; + } + } + + // Update the flags of sTotalQueued. + { + TotalQueued_wat total_queued_w(sTotalQueued); + if (total_queued_w->approved == 0) + { + if ((success & approved_mask)) + { + // We obtained an approved request from the queue, and after that there were no more requests in any (approved) queue. + total_queued_w->empty = true; + } + else + { + // Every queue of every approved CT is empty! + total_queued_w->starvation = true; + } + } + else if ((success & approved_mask)) + { + // We obtained an approved request from the queue, and even after that there was at least one more request in some (approved) queue. + total_queued_w->full = true; + } + } + + // Don't try other services if anything was added successfully. + if (success || only_this_service) { return; } + // Nothing from this service could be added, try other services. instance_map_wat instance_map_w(sInstanceMap); for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) @@ -516,8 +552,11 @@ void AIPerService::purge(void) { size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size(); per_service_w->mCapabilityType[i].mQueuedRequests.clear(); - total_queued_w->count -= s; - llassert(total_queued_w->count >= 0); + if (is_approved((AICapabilityType)i)) + { + llassert(total_queued_w->approved >= s); + total_queued_w->approved -= s; + } } } } @@ -529,16 +568,16 @@ void AIPerService::adjust_concurrent_connections(int increment) for (AIPerService::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter) { PerService_wat per_service_w(*iter->second); - U16 old_concurrent_connections = per_service_w->mConcurrectConnections; + U16 old_concurrent_connections = per_service_w->mConcurrentConnections; int new_concurrent_connections = llclamp(old_concurrent_connections + increment, 1, (int)CurlConcurrentConnectionsPerService); - per_service_w->mConcurrectConnections = (U16)new_concurrent_connections; - increment = per_service_w->mConcurrectConnections - old_concurrent_connections; + per_service_w->mConcurrentConnections = (U16)new_concurrent_connections; + increment = per_service_w->mConcurrentConnections - old_concurrent_connections; for (int i = 0; i < number_of_capability_types; ++i) { per_service_w->mCapabilityType[i].mMaxPipelinedRequests = llmax(per_service_w->mCapabilityType[i].mMaxPipelinedRequests + increment, 0); int new_concurrent_connections_per_capability_type = - llclamp((new_concurrent_connections * per_service_w->mCapabilityType[i].mConcurrectConnections + old_concurrent_connections / 2) / old_concurrent_connections, 1, new_concurrent_connections); - per_service_w->mCapabilityType[i].mConcurrectConnections = (U16)new_concurrent_connections_per_capability_type; + llclamp((new_concurrent_connections * per_service_w->mCapabilityType[i].mConcurrentConnections + old_concurrent_connections / 2) / old_concurrent_connections, 1, new_concurrent_connections); + per_service_w->mCapabilityType[i].mConcurrentConnections = (U16)new_concurrent_connections_per_capability_type; } } } @@ -554,8 +593,9 @@ void AIPerService::Approvement::honored(void) { mHonored = true; PerService_wat per_service_w(*mPerServicePtr); - llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0); + llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0 && per_service_w->mApprovedRequests > 0); per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--; + per_service_w->mApprovedRequests--; } } diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index b8d8169ea..0509dfd95 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -87,6 +87,8 @@ enum AICapabilityType { // {Capabilities} [Responders] number_of_capability_types = 4 }; +static U32 const approved_mask = 3; // The mask of cap_texture OR-ed with the mask of cap_inventory. + //----------------------------------------------------------------------------- // AIPerService @@ -137,7 +139,7 @@ class AIPerService { typedef std::deque queued_request_type; queued_request_type mQueuedRequests; // Waiting (throttled) requests. - U16 mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. + U16 mApprovedRequests; // The number of approved requests for this CT by approveHTTPRequestFor that were not added to the command queue yet. U16 mQueuedCommands; // Number of add commands (minus remove commands), for this service, in the command queue. U16 mAdded; // Number of active easy handles with this service. U16 mFlags; // ctf_empty: Set to true when the queue becomes precisely empty. @@ -145,7 +147,7 @@ class AIPerService { // ctf_starvation: Set to true when the queue was about to be popped but was already empty. U32 mDownloading; // The number of active easy handles with this service for which data was received. U16 mMaxPipelinedRequests; // The maximum number of accepted requests for this service and (approved) capability type, that didn't finish yet. - U16 mConcurrectConnections; // The maximum number of allowed concurrent connections to the service of this capability type. + U16 mConcurrentConnections; // The maximum number of allowed concurrent connections to the service of this capability type. // Declare, not define, constructor and destructor - in order to avoid instantiation of queued_request_type from header. CapabilityType(void); @@ -158,10 +160,9 @@ class AIPerService { CapabilityType mCapabilityType[number_of_capability_types]; AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. - int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. + int mConcurrentConnections; // The maximum number of allowed concurrent connections to this service. + int mApprovedRequests; // The number of approved requests for this service by approveHTTPRequestFor that were not added to the command queue yet. int mTotalAdded; // Number of active easy handles with this service. - int mApprovedFirst; // First capability type to try. - int mUnapprovedFirst; // First capability type to try after all approved types were tried. U32 mUsedCT; // Bit mask with one bit per capability type. A '1' means the capability was in use since the last resetUsedCT(). U32 mCTInUse; // Bit mask with one bit per capability type. A '1' means the capability is in use right now. @@ -196,6 +197,7 @@ class AIPerService { } } public: + static bool is_approved(AICapabilityType capability_type) { return (((U32)1 << capability_type) & approved_mask); } static U32 CT2mask(AICapabilityType capability_type) { return (U32)1 << capability_type; } void resetUsedCt(void) { mUsedCT = mCTInUse; } bool is_used(AICapabilityType capability_type) const { return (mUsedCT & CT2mask(capability_type)); } @@ -208,34 +210,34 @@ class AIPerService { // Global administration of the total number of queued requests of all services combined. private: struct TotalQueued { - S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together. - bool empty; // Set to true when count becomes precisely zero as the result of popping any queue. - bool full; // Set to true when count is still larger than zero after popping any queue. - bool starvation; // Set to true when any queue was about to be popped when count was already zero. - TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { } + S32 approved; // The sum of mQueuedRequests.size() of all AIPerService::CapabilityType objects of approved types. + bool empty; // Set to true when approved becomes precisely zero as the result of popping any queue. + bool full; // Set to true when approved is still larger than zero after popping any queue. + bool starvation; // Set to true when any queue was about to be popped when approved was already zero. + TotalQueued(void) : approved(0), empty(false), full(false), starvation(false) { } }; static AIThreadSafeSimpleDC sTotalQueued; typedef AIAccessConst TotalQueued_crat; typedef AIAccess TotalQueued_rat; typedef AIAccess TotalQueued_wat; public: - static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; } + static S32 total_approved_queue_size(void) { return TotalQueued_rat(sTotalQueued)->approved; } // Global administration of the maximum number of pipelined requests of all services combined. private: struct MaxPipelinedRequests { - S32 count; // The maximum total number of accepted requests that didn't finish yet. + S32 threshold; // The maximum total number of accepted requests that didn't finish yet. U64 last_increment; // Last time that sMaxPipelinedRequests was incremented. U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented. - MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { } + MaxPipelinedRequests(void) : threshold(32), last_increment(0), last_decrement(0) { } }; static AIThreadSafeSimpleDC sMaxPipelinedRequests; typedef AIAccessConst MaxPipelinedRequests_crat; typedef AIAccess MaxPipelinedRequests_rat; typedef AIAccess MaxPipelinedRequests_wat; public: - static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; } - static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; } + static void setMaxPipelinedRequests(S32 threshold) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold = threshold; } + static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold += increment; } // Global administration of throttle fraction (which is the same for all services). private: @@ -260,11 +262,12 @@ class AIPerService { void removed_from_multi_handle(AICapabilityType capability_type, bool downloaded_something); // Called when an easy handle for this service is removed again from the multi handle. void download_started(AICapabilityType capability_type) { ++mCapabilityType[capability_type].mDownloading; } bool throttled(AICapabilityType capability_type) const; // Returns true if the maximum number of allowed requests for this service/capability type have been added to the multi handle. + bool nothing_added(AICapabilityType capability_type) const { return mCapabilityType[capability_type].mAdded == 0; } bool queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type, bool force_queuing = true); // Add easy_request to the queue if queue is empty or force_queuing. bool cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Remove easy_request from the queue (if it's there). - void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool recursive = false); + void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool only_this_service = false); // Add queued easy handle (if any) to the multi handle. The request is removed from the queue, // followed by either a call to added_to_multi_handle() or to queue() to add it back. diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 98d57cdcf..a29859048 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1710,7 +1710,7 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const return ret; } -static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread(). +U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread(). bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue) { @@ -1724,13 +1724,23 @@ bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool f if (!from_queue) { // Add the request to the back of a non-empty queue. - if (PerService_wat(*per_service)->queue(easy_request, capability_type, false)) + PerService_wat per_service_w(*per_service); + if (per_service_w->queue(easy_request, capability_type, false)) { // The queue was not empty, therefore the request was queued. #ifdef SHOW_ASSERT // Not active yet, but it's no longer an error if next we try to remove the request. curl_easy_request_w->mRemovedPerCommand = false; #endif + // This is a fail-safe. Normally, if there is anything in the queue then things should + // be running (normally an attempt is made to add from the queue whenever a request + // finishes). However, it CAN happen on occassion that things get 'stuck' with + // nothing running, so nothing will ever finish and therefore the queue would never + // be checked. Only do this when there is indeed nothing running (added) though. + if (per_service_w->nothing_added(capability_type)) + { + per_service_w->add_queued_to(this); + } return true; } } @@ -2618,7 +2628,7 @@ U32 getNumHTTPCommands(void) U32 getNumHTTPQueued(void) { - return AIPerService::total_queued_size(); + return AIPerService::total_approved_queue_size(); } U32 getNumHTTPAdded(void) @@ -2666,6 +2676,51 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // AIPerService::mQueuedRequests and the already running requests // (in MultiHandle::mAddedEasyRequests)). // +// A request has two types of reasons why it can be throttled: +// 1) The number of connections. +// 2) Bandwidth usage. +// And three levels where each can occur: +// a) Global +// b) Service +// c) Capability Type (CT) +// Currently, not all of those are in use. The ones that are used are: +// +// | Global | Service | CT +// +--------+---------+-------- +// 1) The number of connections | X | X | X +// 2) Bandwidth usage | X | | +// +// Pre-approved requests have the bandwidth tested here, and the +// connections tested in the curl thread, right before they are +// added to the multi handle. +// +// The "pipeline" is as follows: +// +// // If the number of requests in the pipeline is less than a threshold +// | // and the global bandwidth usage is not too large. +// V +// +// | +// V +// +// | +// V +// // If the number of connections at all three levels allow it. +// | +// V +// +// +// Every time this function is called, but not more often than once every 40 ms, the state +// of the CT queue is checked to be starvation, empty or full. If it is starvation +// then the threshold for allowed number of connections is incremented by one, +// if it is empty then nother is done and when it is full then the threshold is +// decremented by one. +// +// Starvation means that we could add a request from the queue to the multi handle, +// but the queue was empty. Empty means that after adding one or more requests to the +// multi handle the queue became empty, and full means that after adding one of more +// requests to the multi handle the queue still wasn't empty (see AIPerService::add_queued_to). +// //static AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type) { @@ -2677,13 +2732,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // Cache all sTotalQueued info. bool starvation, decrement_threshold; - S32 total_queued_or_added = MultiHandle::total_added_size(); + S32 total_approved_queuedapproved_or_added = MultiHandle::total_added_size(); { TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_or_added += total_queued_w->count; + total_approved_queuedapproved_or_added += total_queued_w->approved; starvation = total_queued_w->starvation; decrement_threshold = total_queued_w->full && !total_queued_w->empty; - total_queued_w->empty = total_queued_w->full = false; // Reset flags. + total_queued_w->starvation = total_queued_w->empty = total_queued_w->full = false; // Reset flags. } // Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate. @@ -2691,13 +2746,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (decrement_threshold) { MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); - if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections && + if (max_pipelined_requests_w->threshold > (S32)curl_max_total_concurrent_connections && sTime_40ms > max_pipelined_requests_w->last_decrement) { // Decrement the threshold because since the last call to this function at least one curl request finished // and was replaced with another request from the queue, but the queue never ran empty: we have too many // queued requests. - max_pipelined_requests_w->count--; + max_pipelined_requests_w->threshold--; // Do this at most once every 40 ms. max_pipelined_requests_w->last_decrement = sTime_40ms; } @@ -2717,14 +2772,14 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c ct.mFlags = 0; if (decrement_threshold) { - if ((int)ct.mMaxPipelinedRequests > ct.mConcurrectConnections) + if ((int)ct.mMaxPipelinedRequests > ct.mConcurrentConnections) { ct.mMaxPipelinedRequests--; } } else if (increment_threshold && reject) { - if ((int)ct.mMaxPipelinedRequests < 2 * ct.mConcurrectConnections) + if ((int)ct.mMaxPipelinedRequests < 2 * ct.mConcurrentConnections) { ct.mMaxPipelinedRequests++; // Immediately take the new threshold into account. @@ -2736,7 +2791,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // Before releasing the lock on per_service, stop other threads from getting a // too small value from pipelined_requests() and approving too many requests. ct.mApprovedRequests++; + per_service_w->mApprovedRequests++; } + total_approved_queuedapproved_or_added += per_service_w->mApprovedRequests; } if (reject) { @@ -2748,13 +2805,15 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (checkBandwidthUsage(per_service, sTime_40ms)) { // Too much bandwidth is being used, either in total or for this service. - PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + PerService_wat per_service_w(*per_service); + per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + per_service_w->mApprovedRequests--; return NULL; } // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. - S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added; + S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_approved_queuedapproved_or_added; // We can't take the command being processed (command_being_processed) into account without // introducing relatively long waiting times for some mutex (namely between when the command // is moved from command_queue to command_being_processed, till it's actually being added to @@ -2763,18 +2822,19 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // here instead. // The maximum number of requests that may be queued in command_queue is equal to the total number of requests - // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus - // the number of already running requests. + // that may exist in the pipeline minus the number approved requests not yet added to the command queue, minus the + // number of requests queued in AIPerService objects, minus the number of already running requests + // (excluding non-approved requests queued in their CT queue). MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); - reject = pipelined_requests >= max_pipelined_requests_w->count; - equal = pipelined_requests == max_pipelined_requests_w->count; + reject = pipelined_requests >= max_pipelined_requests_w->threshold; + equal = pipelined_requests == max_pipelined_requests_w->threshold; increment_threshold = starvation; if (increment_threshold && reject) { - if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections && + if (max_pipelined_requests_w->threshold < 2 * (S32)curl_max_total_concurrent_connections && sTime_40ms > max_pipelined_requests_w->last_increment) { - max_pipelined_requests_w->count++; + max_pipelined_requests_w->threshold++; max_pipelined_requests_w->last_increment = sTime_40ms; // Immediately take the new threshold into account. reject = !equal; @@ -2782,7 +2842,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } if (reject) { - PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + PerService_wat per_service_w(*per_service); + per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + per_service_w->mApprovedRequests--; return NULL; } return new Approvement(per_service, capability_type); diff --git a/indra/llmessage/aicurlthread.h b/indra/llmessage/aicurlthread.h index 9a4c2b487..080456652 100644 --- a/indra/llmessage/aicurlthread.h +++ b/indra/llmessage/aicurlthread.h @@ -39,6 +39,8 @@ namespace AICurlPrivate { namespace curlthread { +extern U32 curl_max_total_concurrent_connections; + class PollSet; // For ordering a std::set with AICurlEasyRequest objects. @@ -100,6 +102,9 @@ class MultiHandle : public CurlMultiHandle // Return the total number of added curl requests. static U32 total_added_size(void) { return sTotalAdded; } + // Return true if we reached the global maximum number of connections. + static bool added_maximum(void) { return sTotalAdded >= curl_max_total_concurrent_connections; } + public: //----------------------------------------------------------------------------- // Curl socket administration: diff --git a/indra/newview/aihttpview.cpp b/indra/newview/aihttpview.cpp index 84bb4dc59..86849575f 100644 --- a/indra/newview/aihttpview.cpp +++ b/indra/newview/aihttpview.cpp @@ -87,7 +87,7 @@ void AIServiceBar::draw() is_used = per_service_r->is_used(); is_inuse = per_service_r->is_inuse(); total_added = per_service_r->mTotalAdded; - concurrent_connections = per_service_r->mConcurrectConnections; + concurrent_connections = per_service_r->mConcurrentConnections; bandwidth = per_service_r->bandwidth().truncateData(AIHTTPView::getTime_40ms()); cts = per_service_r->mCapabilityType; // Not thread-safe, but we're only reading from it and only using the results to show in a debug console. } @@ -103,11 +103,11 @@ void AIServiceBar::draw() } else if (col < 2) { - text = llformat(" | %hu-%hu-%lu,{%hu/%hu,%u}/%u", ct.mApprovedRequests, ct.mQueuedCommands, ct.mQueuedRequests.size(), ct.mAdded, ct.mConcurrectConnections, ct.mDownloading, ct.mMaxPipelinedRequests); + text = llformat(" | %hu-%hu-%lu,{%hu/%hu,%u}/%u", ct.mApprovedRequests, ct.mQueuedCommands, ct.mQueuedRequests.size(), ct.mAdded, ct.mConcurrentConnections, ct.mDownloading, ct.mMaxPipelinedRequests); } else { - text = llformat(" | --%hu-%lu,{%hu/%hu,%u}", ct.mQueuedCommands, ct.mQueuedRequests.size(), ct.mAdded, ct.mConcurrectConnections, ct.mDownloading); + text = llformat(" | --%hu-%lu,{%hu/%hu,%u}", ct.mQueuedCommands, ct.mQueuedRequests.size(), ct.mAdded, ct.mConcurrentConnections, ct.mDownloading); } LLFontGL::getFontMonospace()->renderUTF8(text, 0, start, height, ((is_inuse & mask) == 0) ? LLColor4::grey2 : text_color, LLFontGL::LEFT, LLFontGL::TOP); start += LLFontGL::getFontMonospace()->getWidth(text); diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 5cb16e6d9..29a339eca 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -644,7 +644,7 @@ bool LLAppViewer::init() AIHTTPTimeoutPolicy policy_tmp( "CurlTimeout* Debug Settings", gSavedSettings.getU32("CurlTimeoutDNSLookup"), - gSavedSettings.getU32("CurlTimeoutConnect"), + /*gSavedSettings.getU32("CurlTimeoutConnect") Temporary HACK: 30 is the current max*/ 30, gSavedSettings.getU32("CurlTimeoutReplyDelay"), gSavedSettings.getU32("CurlTimeoutLowSpeedTime"), gSavedSettings.getU32("CurlTimeoutLowSpeedLimit"),