diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 2db6f5ee2..a9c15c5c4 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1251,7 +1251,7 @@ AIPerServicePtr CurlEasyRequest::getPerServicePtr(void) bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const { // Note that easy_request (must) represent(s) this object; it's just passed for convenience. - return mPerServicePtr && PerServiceRequestQueue_wat(*mPerServicePtr)->cancel(easy_request); + return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request); } std::string CurlEasyRequest::getLowercaseHostname(void) const diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 28721c1e9..44b650596 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -42,10 +42,7 @@ #include "llcontrol.h" AIPerService::threadsafe_instance_map_type AIPerService::sInstanceMap; -LLAtomicS32 AIPerService::sTotalQueued; -bool AIPerService::sQueueEmpty; -bool AIPerService::sQueueFull; -bool AIPerService::sRequestStarvation; +AIThreadSafeSimpleDC AIPerService::sTotalQueued; #undef AICurlPrivate @@ -54,14 +51,14 @@ namespace AICurlPrivate { // Cached value of CurlConcurrentConnectionsPerService. U32 CurlConcurrentConnectionsPerService; -// Friend functions of RefCountedThreadSafePerServiceRequestQueue +// Friend functions of RefCountedThreadSafePerService -void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* per_service) +void intrusive_ptr_add_ref(RefCountedThreadSafePerService* per_service) { per_service->mReferenceCount++; } -void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_service) +void intrusive_ptr_release(RefCountedThreadSafePerService* per_service) { if (--per_service->mReferenceCount == 0) { @@ -194,7 +191,7 @@ AIPerServicePtr AIPerService::instance(std::string const& servicename) AIPerService::iterator iter = instance_map_w->find(servicename); if (iter == instance_map_w->end()) { - iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerServiceRequestQueue)).first; + iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerService)).first; } // Note: the creation of AIPerServicePtr MUST be protected by the lock on sInstanceMap (see release()). return iter->second; @@ -219,7 +216,7 @@ void AIPerService::release(AIPerServicePtr& instance) return; } // The reference in the map is the last one; that means there can't be any curl easy requests queued for this host. - llassert(PerServiceRequestQueue_rat(*instance)->mQueuedRequests.empty()); + llassert(PerService_rat(*instance)->mQueuedRequests.empty()); // Find the host and erase it from the map. iterator const end = instance_map_w->end(); for(iterator iter = instance_map_w->begin(); iter != end; ++iter) @@ -256,7 +253,7 @@ void AIPerService::removed_from_multi_handle(void) void AIPerService::queue(AICurlEasyRequest const& easy_request) { mQueuedRequests.push_back(easy_request.get_ptr()); - sTotalQueued++; + TotalQueued_wat(sTotalQueued)->count++; } bool AIPerService::cancel(AICurlEasyRequest const& easy_request) @@ -280,8 +277,9 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request) prev = cur; } mQueuedRequests.pop_back(); // if this is safe. - --sTotalQueued; - llassert(sTotalQueued >= 0); + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_w->count--; + llassert(total_queued_w->count >= 0); return true; } @@ -291,17 +289,6 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) { multi_handle->add_easy_request(mQueuedRequests.front()); mQueuedRequests.pop_front(); - llassert(sTotalQueued > 0); - if (!--sTotalQueued) - { - // We obtained a request from the queue, and after that there we no more request in any queue. - sQueueEmpty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. - sQueueFull = true; - } if (mQueuedRequests.empty()) { // We obtained a request from the queue, and after that there we no more request in the queue of this host. @@ -312,15 +299,28 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) // We obtained a request from the queue, and even after that there was at least one more request in the queue of this host. mQueueFull = true; } + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->count > 0); + if (!--(total_queued_w->count)) + { + // We obtained a request from the queue, and after that there we no more request in any queue. + total_queued_w->empty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in some queue. + total_queued_w->full = true; + } } else { // We can add a new request, but there is none in the queue! mRequestStarvation = true; - if (sTotalQueued == 0) + TotalQueued_wat total_queued_w(sTotalQueued); + if (total_queued_w->count == 0) { // The queue of every host is empty! - sRequestStarvation = true; + total_queued_w->starvation = true; } } } @@ -332,11 +332,12 @@ void AIPerService::purge(void) for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host) { Dout(dc::curl, "Purging queue of host \"" << host->first << "\"."); - PerServiceRequestQueue_wat per_service_w(*host->second); + PerService_wat per_service_w(*host->second); size_t s = per_service_w->mQueuedRequests.size(); per_service_w->mQueuedRequests.clear(); - sTotalQueued -= s; - llassert(sTotalQueued >= 0); + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_w->count -= s; + llassert(total_queued_w->count >= 0); } } @@ -346,7 +347,7 @@ void AIPerService::adjust_concurrent_connections(int increment) instance_map_wat instance_map_w(sInstanceMap); for (AIPerService::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter) { - PerServiceRequestQueue_wat per_service_w(*iter->second); + PerService_wat per_service_w(*iter->second); U32 old_concurrent_connections = per_service_w->mConcurrectConnections; per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService); increment = per_service_w->mConcurrectConnections - old_concurrent_connections; diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 6b33b5c1b..3d7e7fe6d 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -53,7 +53,7 @@ class AIPerService; namespace AICurlPrivate { namespace curlthread { class MultiHandle; } -class RefCountedThreadSafePerServiceRequestQueue; +class RefCountedThreadSafePerService; class ThreadSafeBufferedCurlEasyRequest; // Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h). @@ -61,16 +61,16 @@ typedef boost::intrusive_ptr BufferedCurlEasy // AIPerService objects are created by the curl thread and destructed by the main thread. // We need locking. -typedef AIThreadSafeSimpleDC threadsafe_PerServiceRequestQueue; -typedef AIAccessConst PerServiceRequestQueue_crat; -typedef AIAccess PerServiceRequestQueue_rat; -typedef AIAccess PerServiceRequestQueue_wat; +typedef AIThreadSafeSimpleDC threadsafe_PerService; +typedef AIAccessConst PerService_crat; +typedef AIAccess PerService_rat; +typedef AIAccess PerService_wat; } // namespace AICurlPrivate -// We can't put threadsafe_PerServiceRequestQueue in a std::map because you can't copy a mutex. +// We can't put threadsafe_PerService in a std::map because you can't copy a mutex. // Therefore, use an intrusive pointer for the threadsafe type. -typedef boost::intrusive_ptr AIPerServicePtr; +typedef boost::intrusive_ptr AIPerServicePtr; //----------------------------------------------------------------------------- // AIPerService @@ -89,7 +89,7 @@ class AIPerService { static threadsafe_instance_map_type sInstanceMap; // Map of AIPerService instances with the hostname as key. - friend class AIThreadSafeSimpleDC; // threadsafe_PerServiceRequestQueue + friend class AIThreadSafeSimpleDC; // threadsafe_PerService AIPerService(void); public: @@ -118,27 +118,60 @@ class AIPerService { int mAdded; // Number of active easy handles with this host. queued_request_type mQueuedRequests; // Waiting (throttled) requests. - static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerService objects together. - bool mQueueEmpty; // Set to true when the queue becomes precisely empty. bool mQueueFull; // Set to true when the queue is popped and then still isn't empty; bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty. - static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue. - static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue. - static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero. - AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. int mMaxPipelinedRequests; // The maximum number of accepted requests for this service that didn't finish yet. - static LLAtomicS32 sMaxPipelinedRequests; // The maximum total number of accepted requests that didn't finish yet. - static U64 sLastTime_sMaxPipelinedRequests_increment; // Last time that sMaxPipelinedRequests was incremented. - static U64 sLastTime_sMaxPipelinedRequests_decrement; // Last time that sMaxPipelinedRequests was decremented. - static U64 sLastTime_ThrottleFractionAverage_add; // Last time that sThrottleFraction was added to sThrottleFractionAverage. - static AIThreadSafeSimpleDC sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth. - static AIAverage sThrottleFractionAverage; // Average of sThrottleFraction over 25 * 40ms = 1 second. - static size_t sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s). + // Global administration of the total number of queued requests of all services combined. + private: + struct TotalQueued { + S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together. + bool empty; // Set to true when count becomes precisely zero as the result of popping any queue. + bool full; // Set to true when count is still larger than zero after popping any queue. + bool starvation; // Set to true when any queue was about to be popped when count was already zero. + TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { } + }; + static AIThreadSafeSimpleDC sTotalQueued; + typedef AIAccessConst TotalQueued_crat; + typedef AIAccess TotalQueued_rat; + typedef AIAccess TotalQueued_wat; + public: + static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; } + + // Global administration of the maximum number of pipelined requests of all services combined. + private: + struct MaxPipelinedRequests { + S32 count; // The maximum total number of accepted requests that didn't finish yet. + U64 last_increment; // Last time that sMaxPipelinedRequests was incremented. + U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented. + MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { } + }; + static AIThreadSafeSimpleDC sMaxPipelinedRequests; + typedef AIAccessConst MaxPipelinedRequests_crat; + typedef AIAccess MaxPipelinedRequests_rat; + typedef AIAccess MaxPipelinedRequests_wat; + public: + static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; } + static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; } + + // Global administration of throttle fraction (which is the same for all services). + private: + struct ThrottleFraction { + U32 fraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth. + AIAverage average; // Average of fraction over 25 * 40ms = 1 second. + U64 last_add; // Last time that faction was added to average. + ThrottleFraction(void) : fraction(1024), average(25), last_add(0) { } + }; + static AIThreadSafeSimpleDC sThrottleFraction; + typedef AIAccessConst ThrottleFraction_crat; + typedef AIAccess ThrottleFraction_rat; + typedef AIAccess ThrottleFraction_wat; + + static LLAtomicU32 sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s). static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling. public: @@ -156,7 +189,6 @@ class AIPerService { // followed by either a call to added_to_multi_handle() or to queue() to add it back. S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; } - static S32 total_queued_size(void) { return sTotalQueued; } AIAverage& bandwidth(void) { return mHTTPBandwidth; } AIAverage const& bandwidth(void) const { return mHTTPBandwidth; } @@ -168,15 +200,17 @@ class AIPerService { // Called when CurlConcurrentConnectionsPerService changes. static void adjust_concurrent_connections(int increment); + // The two following functions are static and have the AIPerService object passed + // as first argument as an AIPerServicePtr because that avoids the need of having + // the AIPerService object locked for the whole duration of the call. + // The functions only lock it when access is required. + // Returns true if curl can handle another request for this host. // Should return false if the maximum allowed HTTP bandwidth is reached, or when // the latency between request and actual delivery becomes too large. static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service); // Return true if too much bandwidth is being used. - static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth_ptr); - - // Accessor for when curl_max_total_concurrent_connections changes. - static LLAtomicS32& maxPipelinedRequests(void) { return sMaxPipelinedRequests; } + static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); private: // Disallow copying. @@ -185,17 +219,17 @@ class AIPerService { namespace AICurlPrivate { -class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceRequestQueue { +class RefCountedThreadSafePerService : public threadsafe_PerService { public: - RefCountedThreadSafePerServiceRequestQueue(void) : mReferenceCount(0) { } + RefCountedThreadSafePerService(void) : mReferenceCount(0) { } bool exactly_two_left(void) const { return mReferenceCount == 2; } private: // Used by AIPerServicePtr. Object is deleted when reference count reaches zero. LLAtomicU32 mReferenceCount; - friend void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* p); - friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p); + friend void intrusive_ptr_add_ref(RefCountedThreadSafePerService* p); + friend void intrusive_ptr_release(RefCountedThreadSafePerService* p); }; extern U32 CurlConcurrentConnectionsPerService; diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index b0eb7ca55..6f0da9608 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1331,11 +1331,11 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) case cmd_boost: // FIXME: future stuff break; case cmd_add: - PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); + PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request())); break; case cmd_remove: - PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. + PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true); break; } @@ -1708,10 +1708,8 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); per_service = curl_easy_request_w->getPerServicePtr(); - bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && - AIPerService::checkBandwidthUsage(get_clock_count() * HTTPTimeout::sClockWidth_40ms, - PerServiceRequestQueue_wat(*per_service)->bandwidth()); - PerServiceRequestQueue_wat per_service_w(*per_service); + bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + PerService_wat per_service_w(*per_service); if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) { curl_easy_request_w->set_timeout_opts(); @@ -1736,7 +1734,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) return; } // The request could not be added, we have to queue it. - PerServiceRequestQueue_wat(*per_service)->queue(easy_request); + PerService_wat(*per_service)->queue(easy_request); #ifdef SHOW_ASSERT // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; @@ -1774,7 +1772,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons AICurlEasyRequest_wat curl_easy_request_w(**iter); res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle); per_service = curl_easy_request_w->getPerServicePtr(); - PerServiceRequestQueue_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. + PerService_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. #ifdef SHOW_ASSERT curl_easy_request_w->mRemovedPerCommand = as_per_command; #endif @@ -1791,7 +1789,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons #endif // Attempt to add a queued request, if any. - PerServiceRequestQueue_wat(*per_service)->add_queued_to(this); + PerService_wat(*per_service)->add_queued_to(this); return res; } @@ -2124,7 +2122,7 @@ void BufferedCurlEasyRequest::update_body_bandwidth(void) if (raw_bytes > 0) { U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2; - AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*getPerServicePtr())->bandwidth()); + AIAverage& http_bandwidth(PerService_wat(*getPerServicePtr())->bandwidth()); http_bandwidth.addData(raw_bytes, sTime_40ms); sHTTPBandwidth.addData(raw_bytes, sTime_40ms); } @@ -2219,7 +2217,7 @@ size_t BufferedCurlEasyRequest::curlHeaderCallback(char* data, size_t size, size } // Update HTTP bandwidth. U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2; - AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*self_w->getPerServicePtr())->bandwidth()); + AIAverage& http_bandwidth(PerService_wat(*self_w->getPerServicePtr())->bandwidth()); http_bandwidth.addData(header_len, sTime_40ms); sHTTPBandwidth.addData(header_len, sTime_40ms); // Update timeout administration. This must be done after the status is already known. @@ -2425,7 +2423,7 @@ void AICurlEasyRequest::addRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_add)); command_queue_w->size++; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); + PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); curl_easy_request_w->add_queued(); } // Something was added to the queue, wake up the thread to get it. @@ -2489,7 +2487,7 @@ void AICurlEasyRequest::removeRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_remove)); command_queue_w->size--; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. + PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. // Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request. curl_easy_request_w->remove_queued(); } @@ -2515,7 +2513,7 @@ void startCurlThread(LLControlGroup* control_group) curl_max_total_concurrent_connections = sConfigGroup->getU32("CurlMaxTotalConcurrentConnections"); CurlConcurrentConnectionsPerService = sConfigGroup->getU32("CurlConcurrentConnectionsPerService"); gNoVerifySSLCert = sConfigGroup->getBOOL("NoVerifySSLCert"); - AIPerService::maxPipelinedRequests() = curl_max_total_concurrent_connections; + AIPerService::setMaxPipelinedRequests(curl_max_total_concurrent_connections); AIPerService::setHTTPThrottleBandwidth(sConfigGroup->getF32("HTTPThrottleBandwidth")); AICurlThread::sInstance = new AICurlThread; @@ -2529,7 +2527,7 @@ bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue) U32 old = curl_max_total_concurrent_connections; curl_max_total_concurrent_connections = newvalue.asInteger(); - AIPerService::maxPipelinedRequests() += curl_max_total_concurrent_connections - old; + AIPerService::incrementMaxPipelinedRequests(curl_max_total_concurrent_connections - old); llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl; return true; } @@ -2580,13 +2578,9 @@ size_t getHTTPBandwidth(void) } // namespace AICurlInterface // Global AIPerService members. -U64 AIPerService::sLastTime_sMaxPipelinedRequests_increment = 0; -U64 AIPerService::sLastTime_sMaxPipelinedRequests_decrement = 0; -LLAtomicS32 AIPerService::sMaxPipelinedRequests(32); -U64 AIPerService::sLastTime_ThrottleFractionAverage_add = 0; -AIThreadSafeSimpleDC AIPerService::sThrottleFraction(1024); -AIAverage AIPerService::sThrottleFractionAverage(25); -size_t AIPerService::sHTTPThrottleBandwidth125 = 250000; +AIThreadSafeSimpleDC AIPerService::sMaxPipelinedRequests; +AIThreadSafeSimpleDC AIPerService::sThrottleFraction; +LLAtomicU32 AIPerService::sHTTPThrottleBandwidth125(250000); bool AIPerService::sNoHTTPBandwidthThrottling; // Return true if we want at least one more HTTP request for this host. @@ -2619,38 +2613,42 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; - bool reject, equal, increment_threshold, decrement_threshold; - // Do certain things at most once every 40ms. U64 const sTime_40ms = get_clock_count() * HTTPTimeout::sClockWidth_40ms; // Time in 40ms units. - // Atomic read sMaxPipelinedRequests for the below calculations. - S32 const max_pipelined_requests_cache = sMaxPipelinedRequests; + // Cache all sTotalQueued info. + bool starvation, decrement_threshold; + S32 total_queued_or_added = MultiHandle::total_added_size(); + { + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_or_added += total_queued_w->count; + starvation = total_queued_w->starvation; + decrement_threshold = total_queued_w->full && !total_queued_w->empty; + total_queued_w->empty = total_queued_w->full = false; // Reset flags. + } // Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate. - decrement_threshold = sQueueFull && !sQueueEmpty; - sQueueEmpty = sQueueFull = false; // Reset flags. if (decrement_threshold) { - if (max_pipelined_requests_cache > (S32)curl_max_total_concurrent_connections && - sTime_40ms > sLastTime_sMaxPipelinedRequests_decrement) + MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); + if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections && + sTime_40ms > max_pipelined_requests_w->last_decrement) { // Decrement the threshold because since the last call to this function at least one curl request finished // and was replaced with another request from the queue, but the queue never ran empty: we have too many // queued requests. - --sMaxPipelinedRequests; + max_pipelined_requests_w->count--; // Do this at most once every 40 ms. - sLastTime_sMaxPipelinedRequests_decrement = sTime_40ms; + max_pipelined_requests_w->last_decrement = sTime_40ms; } } // Check if it's ok to get a new request for this particular service and update the per-service threshold. - AIAverage* http_bandwidth_ptr; - + bool reject, equal, increment_threshold; { - PerServiceRequestQueue_wat per_service_w(*per_service); + PerService_wat per_service_w(*per_service); S32 const pipelined_requests_per_service = per_service_w->pipelined_requests(); reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests; equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests; @@ -2658,8 +2656,6 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty; // Reset flags. per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false; - // Grab per service bandwidth object. - http_bandwidth_ptr = &per_service_w->bandwidth(); if (decrement_threshold) { if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections) @@ -2684,7 +2680,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) } // Throttle on bandwidth usage. - if (checkBandwidthUsage(sTime_40ms, *http_bandwidth_ptr)) + if (checkBandwidthUsage(per_service, sTime_40ms)) { // Too much bandwidth is being used, either in total or for this service. return false; @@ -2692,30 +2688,28 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. - { - command_queue_rat command_queue_r(command_queue); - S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size(); - // We can't take the command being processed (command_being_processed) into account without - // introducing relatively long waiting times for some mutex (namely between when the command - // is moved from command_queue to command_being_processed, till it's actually being added to - // mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time - // that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness - // here instead. + S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added; + // We can't take the command being processed (command_being_processed) into account without + // introducing relatively long waiting times for some mutex (namely between when the command + // is moved from command_queue to command_being_processed, till it's actually being added to + // mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time + // that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness + // here instead. - // The maximum number of requests that may be queued in command_queue is equal to the total number of requests - // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus - // the number of already running requests. - reject = pipelined_requests >= max_pipelined_requests_cache; - equal = pipelined_requests == max_pipelined_requests_cache; - increment_threshold = sRequestStarvation; - } + // The maximum number of requests that may be queued in command_queue is equal to the total number of requests + // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus + // the number of already running requests. + MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); + reject = pipelined_requests >= max_pipelined_requests_w->count; + equal = pipelined_requests == max_pipelined_requests_w->count; + increment_threshold = starvation; if (increment_threshold && reject) { - if (max_pipelined_requests_cache < 2 * (S32)curl_max_total_concurrent_connections && - sTime_40ms > sLastTime_sMaxPipelinedRequests_increment) + if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections && + sTime_40ms > max_pipelined_requests_w->last_increment) { - sMaxPipelinedRequests++; - sLastTime_sMaxPipelinedRequests_increment = sTime_40ms; + max_pipelined_requests_w->count++; + max_pipelined_requests_w->last_increment = sTime_40ms; // Immediately take the new threshold into account. reject = !equal; } @@ -2723,7 +2717,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) return !reject; } -bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth) +bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) { if (sNoHTTPBandwidthThrottling) return false; @@ -2732,27 +2726,26 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth // Truncate the sums to the last second, and get their value. size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125(); - size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second. - size_t const service_bandwidth = http_bandwidth.truncateData(sTime_40ms); // Idem for just this service. - AIAccess throttle_fraction_w(sThrottleFraction); - // Note that sLastTime_ThrottleFractionAverage_add is protected by the lock on sThrottleFraction... - if (sTime_40ms > sLastTime_ThrottleFractionAverage_add) + size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second. + size_t const service_bandwidth = PerService_wat(*per_service)->bandwidth().truncateData(sTime_40ms); // Idem for just this service. + ThrottleFraction_wat throttle_fraction_w(sThrottleFraction); + if (sTime_40ms > throttle_fraction_w->last_add) { - sThrottleFractionAverage.addData(*throttle_fraction_w, sTime_40ms); - // Only add sThrottleFraction once every 40 ms at most. + throttle_fraction_w->average.addData(throttle_fraction_w->fraction, sTime_40ms); + // Only add throttle_fraction_w->fraction once every 40 ms at most. // It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second. - sLastTime_ThrottleFractionAverage_add = sTime_40ms; + throttle_fraction_w->last_add = sTime_40ms; } - double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data. + double fraction_avg = throttle_fraction_w->average.getAverage(1024.0); // throttle_fraction_w->fraction averaged over the past second, or 1024 if there is no data. - // Adjust sThrottleFraction based on total bandwidth usage. + // Adjust the fraction based on total bandwidth usage. if (total_bandwidth == 0) - *throttle_fraction_w = 1024; + throttle_fraction_w->fraction = 1024; else { // This is the main formula. It can be made plausible by assuming // an equilibrium where total_bandwidth == max_bandwidth and - // thus sThrottleFraction == fraction_avg for more than a second. + // thus fraction == fraction_avg for more than a second. // // Then, more bandwidth is being used (for example because another // service starts downloading). Assuming that all services that use @@ -2765,23 +2758,21 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth // For example, let max_bandwidth be 1. Let there be two throttled // services, each using 0.5 (fraction_avg = 1024/2). Let the new // service use what it can: also 0.5 - then without reduction the - // total_bandwidth would become 1.5, and sThrottleFraction would + // total_bandwidth would become 1.5, and fraction would // become (1024/2) * 1/1.5 = 1024/3: from 2 to 3 services. // // In reality, total_bandwidth would rise linear from 1.0 to 1.5 in // one second if the throttle fraction wasn't changed. However it is // changed here. The end result is that any change more or less // linear fades away in one second. - *throttle_fraction_w = fraction_avg * max_bandwidth / total_bandwidth + 0.5; + throttle_fraction_w->fraction = llmin(1024., fraction_avg * max_bandwidth / total_bandwidth + 0.5); } - if (*throttle_fraction_w > 1024) - *throttle_fraction_w = 1024; if (total_bandwidth > max_bandwidth) { - *throttle_fraction_w *= 0.95; + throttle_fraction_w->fraction *= 0.95; } // Throttle this service if it uses too much bandwidth. - return (service_bandwidth > (max_bandwidth * *throttle_fraction_w / 1024)); + return (service_bandwidth > (max_bandwidth * throttle_fraction_w->fraction / 1024)); }