From f74ad5eafa84ad53591e261478967894a816aa04 Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Thu, 27 Jun 2013 19:57:08 +0200 Subject: [PATCH] Fix global connection threshold not to count non-approved queued request as being in the pipeline. This make more sense when comparing the threshold against 2 times the maximum allowed total connections. As a result, textures won't stall when there are around 128 mesh requests queued. --- indra/llmessage/aicurlperservice.cpp | 61 ++++++++++++++------- indra/llmessage/aicurlperservice.h | 26 +++++---- indra/llmessage/aicurlthread.cpp | 82 +++++++++++++++++++++++----- 3 files changed, 122 insertions(+), 47 deletions(-) diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index f0c391a5b..a0493f1a3 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -73,6 +73,7 @@ using namespace AICurlPrivate; AIPerService::AIPerService(void) : mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. mConcurrentConnections(CurlConcurrentConnectionsPerService), + mApprovedRequests(0), mTotalAdded(0), mUsedCT(0), mCTInUse(0) @@ -363,7 +364,10 @@ bool AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType if (needs_queuing) { queued_requests.push_back(easy_request.get_ptr()); - TotalQueued_wat(sTotalQueued)->count++; + if (is_approved(capability_type)) + { + TotalQueued_wat(sTotalQueued)->approved++; + } } return needs_queuing; } @@ -389,9 +393,12 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityTyp prev = cur; } mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe. - TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_w->count--; - llassert(total_queued_w->count >= 0); + if (is_approved(capability_type)) + { + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->approved > 0); + total_queued_w->approved--; + } return true; } @@ -458,23 +465,26 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool onl // Mark that at least one request of this CT was successfully added. success |= mask; success_this_pass = true; - // Update count and the empty flag of sTotalQueued. - TotalQueued_wat total_queued_w(sTotalQueued); - llassert(total_queued_w->count > 0); - if (!--(total_queued_w->count)) + // Update approved count. + if (is_approved((AICapabilityType)i)) { - // We obtained a request from the queue, and after that there we no more request in any queue. - total_queued_w->empty = true; - // Since there is no request left anywhere anymore, abort looking for one. - break; + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->approved > 0); + total_queued_w->approved--; } } } + size_t queuedapproved_size = 0; for (int i = 0; i < number_of_capability_types; ++i) { CapabilityType& ct(mCapabilityType[i]); U32 mask = CT2mask((AICapabilityType)i); + // Add up the size of all queues with approved requests. + if ((approved_mask & mask)) + { + queuedapproved_size += ct.mQueuedRequests.size(); + } // Skip CTs that we didn't add anything for. if (!(success & mask)) { @@ -487,20 +497,25 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool onl } } - // Update the starvation and full flags of sTotalQueued. + // Update the flags of sTotalQueued. { TotalQueued_wat total_queued_w(sTotalQueued); - if (total_queued_w->count == 0) + if (total_queued_w->approved == 0) { - if (!success) + if ((success & approved_mask)) { - // The queue of every service is empty! + // We obtained an approved request from the queue, and after that there were no more requests in any (approved) queue. + total_queued_w->empty = true; + } + else + { + // Every queue of every approved CT is empty! total_queued_w->starvation = true; } } - else if (success) + else if ((success & approved_mask)) { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. + // We obtained an approved request from the queue, and even after that there was at least one more request in some (approved) queue. total_queued_w->full = true; } } @@ -537,8 +552,11 @@ void AIPerService::purge(void) { size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size(); per_service_w->mCapabilityType[i].mQueuedRequests.clear(); - total_queued_w->count -= s; - llassert(total_queued_w->count >= 0); + if (is_approved((AICapabilityType)i)) + { + llassert(total_queued_w->approved >= s); + total_queued_w->approved -= s; + } } } } @@ -575,8 +593,9 @@ void AIPerService::Approvement::honored(void) { mHonored = true; PerService_wat per_service_w(*mPerServicePtr); - llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0); + llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0 && per_service_w->mApprovedRequests > 0); per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--; + per_service_w->mApprovedRequests--; } } diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 8ee621fb3..0509dfd95 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -87,6 +87,8 @@ enum AICapabilityType { // {Capabilities} [Responders] number_of_capability_types = 4 }; +static U32 const approved_mask = 3; // The mask of cap_texture OR-ed with the mask of cap_inventory. + //----------------------------------------------------------------------------- // AIPerService @@ -137,7 +139,7 @@ class AIPerService { typedef std::deque queued_request_type; queued_request_type mQueuedRequests; // Waiting (throttled) requests. - U16 mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. + U16 mApprovedRequests; // The number of approved requests for this CT by approveHTTPRequestFor that were not added to the command queue yet. U16 mQueuedCommands; // Number of add commands (minus remove commands), for this service, in the command queue. U16 mAdded; // Number of active easy handles with this service. U16 mFlags; // ctf_empty: Set to true when the queue becomes precisely empty. @@ -159,6 +161,7 @@ class AIPerService { AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. int mConcurrentConnections; // The maximum number of allowed concurrent connections to this service. + int mApprovedRequests; // The number of approved requests for this service by approveHTTPRequestFor that were not added to the command queue yet. int mTotalAdded; // Number of active easy handles with this service. U32 mUsedCT; // Bit mask with one bit per capability type. A '1' means the capability was in use since the last resetUsedCT(). @@ -194,6 +197,7 @@ class AIPerService { } } public: + static bool is_approved(AICapabilityType capability_type) { return (((U32)1 << capability_type) & approved_mask); } static U32 CT2mask(AICapabilityType capability_type) { return (U32)1 << capability_type; } void resetUsedCt(void) { mUsedCT = mCTInUse; } bool is_used(AICapabilityType capability_type) const { return (mUsedCT & CT2mask(capability_type)); } @@ -206,34 +210,34 @@ class AIPerService { // Global administration of the total number of queued requests of all services combined. private: struct TotalQueued { - S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together. - bool empty; // Set to true when count becomes precisely zero as the result of popping any queue. - bool full; // Set to true when count is still larger than zero after popping any queue. - bool starvation; // Set to true when any queue was about to be popped when count was already zero. - TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { } + S32 approved; // The sum of mQueuedRequests.size() of all AIPerService::CapabilityType objects of approved types. + bool empty; // Set to true when approved becomes precisely zero as the result of popping any queue. + bool full; // Set to true when approved is still larger than zero after popping any queue. + bool starvation; // Set to true when any queue was about to be popped when approved was already zero. + TotalQueued(void) : approved(0), empty(false), full(false), starvation(false) { } }; static AIThreadSafeSimpleDC sTotalQueued; typedef AIAccessConst TotalQueued_crat; typedef AIAccess TotalQueued_rat; typedef AIAccess TotalQueued_wat; public: - static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; } + static S32 total_approved_queue_size(void) { return TotalQueued_rat(sTotalQueued)->approved; } // Global administration of the maximum number of pipelined requests of all services combined. private: struct MaxPipelinedRequests { - S32 count; // The maximum total number of accepted requests that didn't finish yet. + S32 threshold; // The maximum total number of accepted requests that didn't finish yet. U64 last_increment; // Last time that sMaxPipelinedRequests was incremented. U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented. - MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { } + MaxPipelinedRequests(void) : threshold(32), last_increment(0), last_decrement(0) { } }; static AIThreadSafeSimpleDC sMaxPipelinedRequests; typedef AIAccessConst MaxPipelinedRequests_crat; typedef AIAccess MaxPipelinedRequests_rat; typedef AIAccess MaxPipelinedRequests_wat; public: - static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; } - static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; } + static void setMaxPipelinedRequests(S32 threshold) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold = threshold; } + static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold += increment; } // Global administration of throttle fraction (which is the same for all services). private: diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index a774cad35..a29859048 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -2628,7 +2628,7 @@ U32 getNumHTTPCommands(void) U32 getNumHTTPQueued(void) { - return AIPerService::total_queued_size(); + return AIPerService::total_approved_queue_size(); } U32 getNumHTTPAdded(void) @@ -2676,6 +2676,51 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // AIPerService::mQueuedRequests and the already running requests // (in MultiHandle::mAddedEasyRequests)). // +// A request has two types of reasons why it can be throttled: +// 1) The number of connections. +// 2) Bandwidth usage. +// And three levels where each can occur: +// a) Global +// b) Service +// c) Capability Type (CT) +// Currently, not all of those are in use. The ones that are used are: +// +// | Global | Service | CT +// +--------+---------+-------- +// 1) The number of connections | X | X | X +// 2) Bandwidth usage | X | | +// +// Pre-approved requests have the bandwidth tested here, and the +// connections tested in the curl thread, right before they are +// added to the multi handle. +// +// The "pipeline" is as follows: +// +// // If the number of requests in the pipeline is less than a threshold +// | // and the global bandwidth usage is not too large. +// V +// +// | +// V +// +// | +// V +// // If the number of connections at all three levels allow it. +// | +// V +// +// +// Every time this function is called, but not more often than once every 40 ms, the state +// of the CT queue is checked to be starvation, empty or full. If it is starvation +// then the threshold for allowed number of connections is incremented by one, +// if it is empty then nother is done and when it is full then the threshold is +// decremented by one. +// +// Starvation means that we could add a request from the queue to the multi handle, +// but the queue was empty. Empty means that after adding one or more requests to the +// multi handle the queue became empty, and full means that after adding one of more +// requests to the multi handle the queue still wasn't empty (see AIPerService::add_queued_to). +// //static AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type) { @@ -2687,13 +2732,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // Cache all sTotalQueued info. bool starvation, decrement_threshold; - S32 total_queued_or_added = MultiHandle::total_added_size(); + S32 total_approved_queuedapproved_or_added = MultiHandle::total_added_size(); { TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_or_added += total_queued_w->count; + total_approved_queuedapproved_or_added += total_queued_w->approved; starvation = total_queued_w->starvation; decrement_threshold = total_queued_w->full && !total_queued_w->empty; - total_queued_w->empty = total_queued_w->full = false; // Reset flags. + total_queued_w->starvation = total_queued_w->empty = total_queued_w->full = false; // Reset flags. } // Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate. @@ -2701,13 +2746,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (decrement_threshold) { MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); - if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections && + if (max_pipelined_requests_w->threshold > (S32)curl_max_total_concurrent_connections && sTime_40ms > max_pipelined_requests_w->last_decrement) { // Decrement the threshold because since the last call to this function at least one curl request finished // and was replaced with another request from the queue, but the queue never ran empty: we have too many // queued requests. - max_pipelined_requests_w->count--; + max_pipelined_requests_w->threshold--; // Do this at most once every 40 ms. max_pipelined_requests_w->last_decrement = sTime_40ms; } @@ -2746,7 +2791,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // Before releasing the lock on per_service, stop other threads from getting a // too small value from pipelined_requests() and approving too many requests. ct.mApprovedRequests++; + per_service_w->mApprovedRequests++; } + total_approved_queuedapproved_or_added += per_service_w->mApprovedRequests; } if (reject) { @@ -2758,13 +2805,15 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (checkBandwidthUsage(per_service, sTime_40ms)) { // Too much bandwidth is being used, either in total or for this service. - PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + PerService_wat per_service_w(*per_service); + per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + per_service_w->mApprovedRequests--; return NULL; } // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. - S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added; + S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_approved_queuedapproved_or_added; // We can't take the command being processed (command_being_processed) into account without // introducing relatively long waiting times for some mutex (namely between when the command // is moved from command_queue to command_being_processed, till it's actually being added to @@ -2773,18 +2822,19 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c // here instead. // The maximum number of requests that may be queued in command_queue is equal to the total number of requests - // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus - // the number of already running requests. + // that may exist in the pipeline minus the number approved requests not yet added to the command queue, minus the + // number of requests queued in AIPerService objects, minus the number of already running requests + // (excluding non-approved requests queued in their CT queue). MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); - reject = pipelined_requests >= max_pipelined_requests_w->count; - equal = pipelined_requests == max_pipelined_requests_w->count; + reject = pipelined_requests >= max_pipelined_requests_w->threshold; + equal = pipelined_requests == max_pipelined_requests_w->threshold; increment_threshold = starvation; if (increment_threshold && reject) { - if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections && + if (max_pipelined_requests_w->threshold < 2 * (S32)curl_max_total_concurrent_connections && sTime_40ms > max_pipelined_requests_w->last_increment) { - max_pipelined_requests_w->count++; + max_pipelined_requests_w->threshold++; max_pipelined_requests_w->last_increment = sTime_40ms; // Immediately take the new threshold into account. reject = !equal; @@ -2792,7 +2842,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } if (reject) { - PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + PerService_wat per_service_w(*per_service); + per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. + per_service_w->mApprovedRequests--; return NULL; } return new Approvement(per_service, capability_type);