diff --git a/indra/llcommon/aiframetimer.h b/indra/llcommon/aiframetimer.h index cb50caec5..366581c49 100644 --- a/indra/llcommon/aiframetimer.h +++ b/indra/llcommon/aiframetimer.h @@ -67,7 +67,7 @@ class LL_COMMON_API AIFrameTimer mutable Signal* mCallback; // Pointer to callback struct, or NULL when the object wasn't added to sTimerList yet. public: - AIRunningFrameTimer(F64 expiration, AIFrameTimer* timer) : mExpire(LLFrameTimer::getElapsedSeconds() + expiration), mCallback(NULL), mTimer(timer) { } + AIRunningFrameTimer(F64 expiration, AIFrameTimer* timer) : mExpire(LLFrameTimer::getElapsedSeconds() + expiration), mTimer(timer), mCallback(NULL) { } ~AIRunningFrameTimer() { delete mCallback; } // This function is called after the final object was added to sTimerList (where it is initialized in-place). @@ -89,7 +89,7 @@ class LL_COMMON_API AIFrameTimer #if LL_DEBUG // May not copy this object after it was initialized. AIRunningFrameTimer(AIRunningFrameTimer const& running_frame_timer) : - mExpire(running_frame_timer.mExpire), mCallback(running_frame_timer.mCallback), mTimer(running_frame_timer.mTimer) + mExpire(running_frame_timer.mExpire), mTimer(running_frame_timer.mTimer), mCallback(running_frame_timer.mCallback) { llassert(!mCallback); } #endif }; diff --git a/indra/llcommon/llthread.cpp b/indra/llcommon/llthread.cpp index 8a2b6d0b5..a879485ad 100644 --- a/indra/llcommon/llthread.cpp +++ b/indra/llcommon/llthread.cpp @@ -403,8 +403,8 @@ void LLCondition::broadcast() //============================================================================ LLMutexBase::LLMutexBase() : - mLockingThread(AIThreadID::sNone), - mCount(0) + mCount(0), + mLockingThread(AIThreadID::sNone) { } diff --git a/indra/llmessage/aiaverage.h b/indra/llmessage/aiaverage.h index f8f344df6..ee588bd9f 100644 --- a/indra/llmessage/aiaverage.h +++ b/indra/llmessage/aiaverage.h @@ -52,7 +52,8 @@ class AIAverage { U32 mN; // The number of calls to operator(). int const mNrOfBuckets; // Size of mData. std::vector mData; // The buckets. - LLMutex mLock; // Mutex for all of the above data. + + mutable LLMutex mLock; // Mutex for all of the above data. public: AIAverage(int number_of_buckets) : mCurrentClock(~(U64)0), mTail(0), mCurrentBucket(0), mSum(0), mN(0), mNrOfBuckets(number_of_buckets), mData(number_of_buckets) @@ -88,7 +89,7 @@ class AIAverage { mLock.unlock(); return sum; } - double getAverage(double avg_no_data) + double getAverage(double avg_no_data) const { mLock.lock(); double avg = mSum; diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 06c7f0ab4..a9c15c5c4 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1251,7 +1251,7 @@ AIPerServicePtr CurlEasyRequest::getPerServicePtr(void) bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const { // Note that easy_request (must) represent(s) this object; it's just passed for convenience. - return mPerServicePtr && PerServiceRequestQueue_wat(*mPerServicePtr)->cancel(easy_request); + return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request); } std::string CurlEasyRequest::getLowercaseHostname(void) const @@ -1269,7 +1269,8 @@ LLMutex BufferedCurlEasyRequest::sResponderCallbackMutex; bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); -BufferedCurlEasyRequest::BufferedCurlEasyRequest() : mRequestTransferedBytes(0), mTotalRawBytes(0), mBufferEventsTarget(NULL), mStatus(HTTP_INTERNAL_ERROR_OTHER) +BufferedCurlEasyRequest::BufferedCurlEasyRequest() : + mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mQueueIfTooMuchBandwidthUsage(false) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 28721c1e9..8f8573ea6 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -42,10 +42,7 @@ #include "llcontrol.h" AIPerService::threadsafe_instance_map_type AIPerService::sInstanceMap; -LLAtomicS32 AIPerService::sTotalQueued; -bool AIPerService::sQueueEmpty; -bool AIPerService::sQueueFull; -bool AIPerService::sRequestStarvation; +AIThreadSafeSimpleDC AIPerService::sTotalQueued; #undef AICurlPrivate @@ -54,14 +51,14 @@ namespace AICurlPrivate { // Cached value of CurlConcurrentConnectionsPerService. U32 CurlConcurrentConnectionsPerService; -// Friend functions of RefCountedThreadSafePerServiceRequestQueue +// Friend functions of RefCountedThreadSafePerService -void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* per_service) +void intrusive_ptr_add_ref(RefCountedThreadSafePerService* per_service) { per_service->mReferenceCount++; } -void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_service) +void intrusive_ptr_release(RefCountedThreadSafePerService* per_service) { if (--per_service->mReferenceCount == 0) { @@ -74,7 +71,7 @@ void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_servi using namespace AICurlPrivate; AIPerService::AIPerService(void) : - mQueuedCommands(0), mAdded(0), mQueueEmpty(false), + mApprovedRequests(0), mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. mConcurrectConnections(CurlConcurrentConnectionsPerService), mMaxPipelinedRequests(CurlConcurrentConnectionsPerService) @@ -194,7 +191,7 @@ AIPerServicePtr AIPerService::instance(std::string const& servicename) AIPerService::iterator iter = instance_map_w->find(servicename); if (iter == instance_map_w->end()) { - iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerServiceRequestQueue)).first; + iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerService)).first; } // Note: the creation of AIPerServicePtr MUST be protected by the lock on sInstanceMap (see release()). return iter->second; @@ -219,7 +216,7 @@ void AIPerService::release(AIPerServicePtr& instance) return; } // The reference in the map is the last one; that means there can't be any curl easy requests queued for this host. - llassert(PerServiceRequestQueue_rat(*instance)->mQueuedRequests.empty()); + llassert(PerService_rat(*instance)->mQueuedRequests.empty()); // Find the host and erase it from the map. iterator const end = instance_map_w->end(); for(iterator iter = instance_map_w->begin(); iter != end; ++iter) @@ -256,7 +253,7 @@ void AIPerService::removed_from_multi_handle(void) void AIPerService::queue(AICurlEasyRequest const& easy_request) { mQueuedRequests.push_back(easy_request.get_ptr()); - sTotalQueued++; + TotalQueued_wat(sTotalQueued)->count++; } bool AIPerService::cancel(AICurlEasyRequest const& easy_request) @@ -280,8 +277,9 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request) prev = cur; } mQueuedRequests.pop_back(); // if this is safe. - --sTotalQueued; - llassert(sTotalQueued >= 0); + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_w->count--; + llassert(total_queued_w->count >= 0); return true; } @@ -291,17 +289,6 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) { multi_handle->add_easy_request(mQueuedRequests.front()); mQueuedRequests.pop_front(); - llassert(sTotalQueued > 0); - if (!--sTotalQueued) - { - // We obtained a request from the queue, and after that there we no more request in any queue. - sQueueEmpty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. - sQueueFull = true; - } if (mQueuedRequests.empty()) { // We obtained a request from the queue, and after that there we no more request in the queue of this host. @@ -312,15 +299,28 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) // We obtained a request from the queue, and even after that there was at least one more request in the queue of this host. mQueueFull = true; } + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->count > 0); + if (!--(total_queued_w->count)) + { + // We obtained a request from the queue, and after that there we no more request in any queue. + total_queued_w->empty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in some queue. + total_queued_w->full = true; + } } else { // We can add a new request, but there is none in the queue! mRequestStarvation = true; - if (sTotalQueued == 0) + TotalQueued_wat total_queued_w(sTotalQueued); + if (total_queued_w->count == 0) { // The queue of every host is empty! - sRequestStarvation = true; + total_queued_w->starvation = true; } } } @@ -332,11 +332,12 @@ void AIPerService::purge(void) for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host) { Dout(dc::curl, "Purging queue of host \"" << host->first << "\"."); - PerServiceRequestQueue_wat per_service_w(*host->second); + PerService_wat per_service_w(*host->second); size_t s = per_service_w->mQueuedRequests.size(); per_service_w->mQueuedRequests.clear(); - sTotalQueued -= s; - llassert(sTotalQueued >= 0); + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_w->count -= s; + llassert(total_queued_w->count >= 0); } } @@ -346,7 +347,7 @@ void AIPerService::adjust_concurrent_connections(int increment) instance_map_wat instance_map_w(sInstanceMap); for (AIPerService::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter) { - PerServiceRequestQueue_wat per_service_w(*iter->second); + PerService_wat per_service_w(*iter->second); U32 old_concurrent_connections = per_service_w->mConcurrectConnections; per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService); increment = per_service_w->mConcurrectConnections - old_concurrent_connections; @@ -354,3 +355,14 @@ void AIPerService::adjust_concurrent_connections(int increment) } } +void AIPerService::Approvement::honored(void) +{ + if (!mHonored) + { + mHonored = true; + AICurlPrivate::PerService_wat per_service_w(*mPerServicePtr); + llassert(per_service_w->mApprovedRequests > 0); + per_service_w->mApprovedRequests--; + } +} + diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 20271aeb3..eab41ad80 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -53,7 +53,7 @@ class AIPerService; namespace AICurlPrivate { namespace curlthread { class MultiHandle; } -class RefCountedThreadSafePerServiceRequestQueue; +class RefCountedThreadSafePerService; class ThreadSafeBufferedCurlEasyRequest; // Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h). @@ -61,25 +61,25 @@ typedef boost::intrusive_ptr BufferedCurlEasy // AIPerService objects are created by the curl thread and destructed by the main thread. // We need locking. -typedef AIThreadSafeSimpleDC threadsafe_PerServiceRequestQueue; -typedef AIAccessConst PerServiceRequestQueue_crat; -typedef AIAccess PerServiceRequestQueue_rat; -typedef AIAccess PerServiceRequestQueue_wat; +typedef AIThreadSafeSimpleDC threadsafe_PerService; +typedef AIAccessConst PerService_crat; +typedef AIAccess PerService_rat; +typedef AIAccess PerService_wat; } // namespace AICurlPrivate -// We can't put threadsafe_PerServiceRequestQueue in a std::map because you can't copy a mutex. +// We can't put threadsafe_PerService in a std::map because you can't copy a mutex. // Therefore, use an intrusive pointer for the threadsafe type. -typedef boost::intrusive_ptr AIPerServicePtr; +typedef boost::intrusive_ptr AIPerServicePtr; //----------------------------------------------------------------------------- // AIPerService -// This class provides a static interface to create and maintain instances -// of AIPerService objects, so that at any moment there is at most -// one instance per hostname:port. Those instances then are used to queue curl -// requests when the maximum number of connections for that host already -// have been reached. +// This class provides a static interface to create and maintain instances of AIPerService objects, +// so that at any moment there is at most one instance per service (hostname:port). +// Those instances then are used to queue curl requests when the maximum number of connections +// for that service already have been reached. And to keep track of the bandwidth usage, and the +// number of queued requests in the pipeline, for this service. class AIPerService { private: typedef std::map instance_map_type; @@ -89,7 +89,7 @@ class AIPerService { static threadsafe_instance_map_type sInstanceMap; // Map of AIPerService instances with the hostname as key. - friend class AIThreadSafeSimpleDC; //threadsafe_PerServiceRequestQueue + friend class AIThreadSafeSimpleDC; // threadsafe_PerService AIPerService(void); public: @@ -114,23 +114,66 @@ class AIPerService { private: typedef std::deque queued_request_type; + int mApprovedRequests; // The number of approved requests by wantsMoreHTTPRequestsFor that were not added to the command queue yet. int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue. int mAdded; // Number of active easy handles with this host. queued_request_type mQueuedRequests; // Waiting (throttled) requests. - static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerService objects together. - bool mQueueEmpty; // Set to true when the queue becomes precisely empty. bool mQueueFull; // Set to true when the queue is popped and then still isn't empty; bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty. - static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue. - static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue. - static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero. - AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. - int mMaxPipelinedRequests; // The maximum number of accepted requests that didn't finish yet. + int mMaxPipelinedRequests; // The maximum number of accepted requests for this service that didn't finish yet. + + // Global administration of the total number of queued requests of all services combined. + private: + struct TotalQueued { + S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together. + bool empty; // Set to true when count becomes precisely zero as the result of popping any queue. + bool full; // Set to true when count is still larger than zero after popping any queue. + bool starvation; // Set to true when any queue was about to be popped when count was already zero. + TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { } + }; + static AIThreadSafeSimpleDC sTotalQueued; + typedef AIAccessConst TotalQueued_crat; + typedef AIAccess TotalQueued_rat; + typedef AIAccess TotalQueued_wat; + public: + static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; } + + // Global administration of the maximum number of pipelined requests of all services combined. + private: + struct MaxPipelinedRequests { + S32 count; // The maximum total number of accepted requests that didn't finish yet. + U64 last_increment; // Last time that sMaxPipelinedRequests was incremented. + U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented. + MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { } + }; + static AIThreadSafeSimpleDC sMaxPipelinedRequests; + typedef AIAccessConst MaxPipelinedRequests_crat; + typedef AIAccess MaxPipelinedRequests_rat; + typedef AIAccess MaxPipelinedRequests_wat; + public: + static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; } + static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; } + + // Global administration of throttle fraction (which is the same for all services). + private: + struct ThrottleFraction { + U32 fraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth. + AIAverage average; // Average of fraction over 25 * 40ms = 1 second. + U64 last_add; // Last time that faction was added to average. + ThrottleFraction(void) : fraction(1024), average(25), last_add(0) { } + }; + static AIThreadSafeSimpleDC sThrottleFraction; + typedef AIAccessConst ThrottleFraction_crat; + typedef AIAccess ThrottleFraction_rat; + typedef AIAccess ThrottleFraction_wat; + + static LLAtomicU32 sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s). + static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling. public: void added_to_command_queue(void) { ++mQueuedCommands; } @@ -146,19 +189,40 @@ class AIPerService { // Add queued easy handle (if any) to the multi handle. The request is removed from the queue, // followed by either a call to added_to_multi_handle() or to queue() to add it back. - S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; } - static S32 total_queued_size(void) { return sTotalQueued; } + S32 pipelined_requests(void) const { return mApprovedRequests + mQueuedCommands + mQueuedRequests.size() + mAdded; } AIAverage& bandwidth(void) { return mHTTPBandwidth; } AIAverage const& bandwidth(void) const { return mHTTPBandwidth; } + static void setNoHTTPBandwidthThrottling(bool nb) { sNoHTTPBandwidthThrottling = nb; } + static void setHTTPThrottleBandwidth(F32 max_kbps) { sHTTPThrottleBandwidth125 = 125.f * max_kbps; } + static size_t getHTTPThrottleBandwidth125(void) { return sHTTPThrottleBandwidth125; } + // Called when CurlConcurrentConnectionsPerService changes. static void adjust_concurrent_connections(int increment); + // The two following functions are static and have the AIPerService object passed + // as first argument as an AIPerServicePtr because that avoids the need of having + // the AIPerService object locked for the whole duration of the call. + // The functions only lock it when access is required. + // Returns true if curl can handle another request for this host. // Should return false if the maximum allowed HTTP bandwidth is reached, or when // the latency between request and actual delivery becomes too large. - static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service, F32 max_kbps, bool no_bandwidth_throttling); + static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service); + // Return true if too much bandwidth is being used. + static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); + + // A helper class to decrement mApprovedRequests after requests approved by wantsMoreHTTPRequestsFor were handled. + class Approvement { + private: + AIPerServicePtr mPerServicePtr; + bool mHonored; + public: + Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { } + ~Approvement() { honored(); } + void honored(void); + }; private: // Disallow copying. @@ -167,17 +231,17 @@ class AIPerService { namespace AICurlPrivate { -class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceRequestQueue { +class RefCountedThreadSafePerService : public threadsafe_PerService { public: - RefCountedThreadSafePerServiceRequestQueue(void) : mReferenceCount(0) { } + RefCountedThreadSafePerService(void) : mReferenceCount(0) { } bool exactly_two_left(void) const { return mReferenceCount == 2; } private: // Used by AIPerServicePtr. Object is deleted when reference count reaches zero. LLAtomicU32 mReferenceCount; - friend void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* p); - friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p); + friend void intrusive_ptr_add_ref(RefCountedThreadSafePerService* p); + friend void intrusive_ptr_release(RefCountedThreadSafePerService* p); }; extern U32 CurlConcurrentConnectionsPerService; diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index 57e6d9210..de3e6b4b1 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -375,6 +375,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { void resetState(void); void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder); + // Called if this request should be queued on the curl thread when too much bandwidth is being used. + void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; } + buffer_ptr_t& getInput(void) { return mInput; } buffer_ptr_t& getOutput(void) { return mOutput; } @@ -416,6 +419,7 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U32 mRequestTransferedBytes; size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far. AIBufferedCurlEasyRequestEvents* mBufferEventsTarget; + bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used. public: static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()). @@ -452,6 +456,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } + + // Returns true when this request should be queued by the curl thread when too much bandwidth is being used. + bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 8b4f1a022..cbd2bc784 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -206,8 +206,6 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable) namespace AICurlPrivate { -LLAtomicS32 max_pipelined_requests(32); - enum command_st { cmd_none, cmd_add, @@ -890,7 +888,7 @@ AICurlThread* AICurlThread::sInstance = NULL; AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(CURL_SOCKET_BAD), mWakeUpFd(CURL_SOCKET_BAD), - mZeroTimeout(0), mRunning(true), mWakeUpFlag(false) + mZeroTimeout(0), mWakeUpFlag(false), mRunning(true) { create_wakeup_fds(); sInstance = this; @@ -1333,11 +1331,11 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) case cmd_boost: // FIXME: future stuff break; case cmd_add: - PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); + PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request())); break; case cmd_remove: - PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. + PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true); break; } @@ -1710,8 +1708,9 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); per_service = curl_easy_request_w->getPerServicePtr(); - PerServiceRequestQueue_wat per_service_w(*per_service); - if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) + bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + PerService_wat per_service_w(*per_service); + if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) { curl_easy_request_w->set_timeout_opts(); if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK) @@ -1723,7 +1722,10 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) } // Release the lock on easy_request. if (!throttled) { // ... to here. - std::pair res = mAddedEasyRequests.insert(easy_request); +#ifdef SHOW_ASSERT + std::pair res = +#endif + mAddedEasyRequests.insert(easy_request); llassert(res.second); // May not have been added before. sTotalAdded++; llassert(sTotalAdded == mAddedEasyRequests.size()); @@ -1732,7 +1734,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) return; } // The request could not be added, we have to queue it. - PerServiceRequestQueue_wat(*per_service)->queue(easy_request); + PerService_wat(*per_service)->queue(easy_request); #ifdef SHOW_ASSERT // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; @@ -1770,7 +1772,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons AICurlEasyRequest_wat curl_easy_request_w(**iter); res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle); per_service = curl_easy_request_w->getPerServicePtr(); - PerServiceRequestQueue_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. + PerService_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. #ifdef SHOW_ASSERT curl_easy_request_w->mRemovedPerCommand = as_per_command; #endif @@ -1787,7 +1789,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons #endif // Attempt to add a queued request, if any. - PerServiceRequestQueue_wat(*per_service)->add_queued_to(this); + PerService_wat(*per_service)->add_queued_to(this); return res; } @@ -2120,7 +2122,7 @@ void BufferedCurlEasyRequest::update_body_bandwidth(void) if (raw_bytes > 0) { U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2; - AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*getPerServicePtr())->bandwidth()); + AIAverage& http_bandwidth(PerService_wat(*getPerServicePtr())->bandwidth()); http_bandwidth.addData(raw_bytes, sTime_40ms); sHTTPBandwidth.addData(raw_bytes, sTime_40ms); } @@ -2215,7 +2217,7 @@ size_t BufferedCurlEasyRequest::curlHeaderCallback(char* data, size_t size, size } // Update HTTP bandwidth. U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2; - AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*self_w->getPerServicePtr())->bandwidth()); + AIAverage& http_bandwidth(PerService_wat(*self_w->getPerServicePtr())->bandwidth()); http_bandwidth.addData(header_len, sTime_40ms); sHTTPBandwidth.addData(header_len, sTime_40ms); // Update timeout administration. This must be done after the status is already known. @@ -2421,7 +2423,7 @@ void AICurlEasyRequest::addRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_add)); command_queue_w->size++; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); + PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); curl_easy_request_w->add_queued(); } // Something was added to the queue, wake up the thread to get it. @@ -2485,7 +2487,7 @@ void AICurlEasyRequest::removeRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_remove)); command_queue_w->size--; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. + PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. // Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request. curl_easy_request_w->remove_queued(); } @@ -2511,7 +2513,8 @@ void startCurlThread(LLControlGroup* control_group) curl_max_total_concurrent_connections = sConfigGroup->getU32("CurlMaxTotalConcurrentConnections"); CurlConcurrentConnectionsPerService = sConfigGroup->getU32("CurlConcurrentConnectionsPerService"); gNoVerifySSLCert = sConfigGroup->getBOOL("NoVerifySSLCert"); - max_pipelined_requests = curl_max_total_concurrent_connections; + AIPerService::setMaxPipelinedRequests(curl_max_total_concurrent_connections); + AIPerService::setHTTPThrottleBandwidth(sConfigGroup->getF32("HTTPThrottleBandwidth")); AICurlThread::sInstance = new AICurlThread; AICurlThread::sInstance->start(); @@ -2524,7 +2527,7 @@ bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue) U32 old = curl_max_total_concurrent_connections; curl_max_total_concurrent_connections = newvalue.asInteger(); - max_pipelined_requests += curl_max_total_concurrent_connections - old; + AIPerService::incrementMaxPipelinedRequests(curl_max_total_concurrent_connections - old); llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl; return true; } @@ -2574,6 +2577,12 @@ size_t getHTTPBandwidth(void) } // namespace AICurlInterface +// Global AIPerService members. +AIThreadSafeSimpleDC AIPerService::sMaxPipelinedRequests; +AIThreadSafeSimpleDC AIPerService::sThrottleFraction; +LLAtomicU32 AIPerService::sHTTPThrottleBandwidth125(250000); +bool AIPerService::sNoHTTPBandwidthThrottling; + // Return true if we want at least one more HTTP request for this host. // // It's OK if this function is a bit fuzzy, but we don't want it to return @@ -2599,46 +2608,55 @@ size_t getHTTPBandwidth(void) // running requests (in MultiHandle::mAddedEasyRequests)). // //static -bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service, F32 max_kbps, bool no_bandwidth_throttling) +bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) { using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; - bool reject, equal, increment_threshold, decrement_threshold; + // Do certain things at most once every 40ms. + U64 const sTime_40ms = get_clock_count() * HTTPTimeout::sClockWidth_40ms; // Time in 40ms units. + + // Cache all sTotalQueued info. + bool starvation, decrement_threshold; + S32 total_queued_or_added = MultiHandle::total_added_size(); + { + TotalQueued_wat total_queued_w(sTotalQueued); + total_queued_or_added += total_queued_w->count; + starvation = total_queued_w->starvation; + decrement_threshold = total_queued_w->full && !total_queued_w->empty; + total_queued_w->empty = total_queued_w->full = false; // Reset flags. + } // Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate. - // Atomic read max_pipelined_requests for the below calculations. - S32 const max_pipelined_requests_cache = max_pipelined_requests; - decrement_threshold = sQueueFull && !sQueueEmpty; - // Reset flags. - sQueueEmpty = sQueueFull = false; if (decrement_threshold) { - if (max_pipelined_requests_cache > (S32)curl_max_total_concurrent_connections) + MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); + if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections && + sTime_40ms > max_pipelined_requests_w->last_decrement) { // Decrement the threshold because since the last call to this function at least one curl request finished // and was replaced with another request from the queue, but the queue never ran empty: we have too many // queued requests. - --max_pipelined_requests; + max_pipelined_requests_w->count--; + // Do this at most once every 40 ms. + max_pipelined_requests_w->last_decrement = sTime_40ms; } } // Check if it's ok to get a new request for this particular service and update the per-service threshold. - AIAverage* http_bandwidth_ptr; - + bool reject, equal, increment_threshold; { - PerServiceRequestQueue_wat per_service_w(*per_service); + PerService_wat per_service_w(*per_service); S32 const pipelined_requests_per_service = per_service_w->pipelined_requests(); + //llassert(pipelined_requests_per_service >= 0 && pipelined_requests_per_service <= 16); reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests; equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests; increment_threshold = per_service_w->mRequestStarvation; decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty; // Reset flags. per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false; - // Grab per service bandwidth object. - http_bandwidth_ptr = &per_service_w->bandwidth(); if (decrement_threshold) { if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections) @@ -2655,106 +2673,121 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service, reject = !equal; } } + if (!reject) + { + // Before releasing the lock on per_service, stop other threads from getting a + // too small value from pipelined_requests() and approving too many requests. + per_service_w->mApprovedRequests++; + //llassert(per_service_w->mApprovedRequests <= 16); + } } if (reject) { - // Too many request for this host already. + // Too many request for this service already. return false; } - if (!no_bandwidth_throttling) + // Throttle on bandwidth usage. + if (checkBandwidthUsage(per_service, sTime_40ms)) { - // Throttle on bandwidth usage. - - static size_t throttle_fraction = 1024; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (throttle_fraction/1024) bandwidth. - static AIAverage fraction(25); // Average over 25 * 40ms = 1 second. - static U64 last_sTime_40ms = 0; - - // Truncate the sums to the last second, and get their value. - U64 const sTime_40ms = get_clock_count() * HTTPTimeout::sClockWidth_40ms; // Time in 40ms units. - size_t const max_bandwidth = 125.f * max_kbps; // Convert kbps to bytes per second. - size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second. - size_t const service_bandwidth = http_bandwidth_ptr->truncateData(sTime_40ms); // Idem for just this service. - if (sTime_40ms > last_sTime_40ms) - { - // Only add throttle_fraction once every 40 ms at most. - // It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second. - fraction.addData(throttle_fraction, sTime_40ms); - last_sTime_40ms = sTime_40ms; - } - double fraction_avg = fraction.getAverage(1024.0); // throttle_fraction averaged over the past second, or 1024 if there is no data. - - // Adjust throttle_fraction based on total bandwidth usage. - if (total_bandwidth == 0) - throttle_fraction = 1024; - else - { - // This is the main formula. It can be made plausible by assuming - // an equilibrium where total_bandwidth == max_bandwidth and - // thus throttle_fraction == fraction_avg for more than a second. - // - // Then, more bandwidth is being used (for example because another - // service starts downloading). Assuming that all services that use - // a significant portion of the bandwidth, the new service included, - // must be throttled (all using the same bandwidth; note that the - // new service is immediately throttled at the same value), then - // the limit should be reduced linear with the fraction: - // max_bandwidth / total_bandwidth. - // - // For example, let max_bandwidth be 1. Let there be two throttled - // services, each using 0.5 (fraction_avg = 1024/2). Lets the new - // service use what it can: also 0.5 - then without reduction the - // total_bandwidth would become 1.5, and throttle_fraction would - // become (1024/2) * 1/1.5 = 1024/3: from 2 to 3 services. - // - // In reality, total_bandwidth would rise linear from 1.0 to 1.5 in - // one second if the throttle fraction wasn't changed. However it is - // changed here. The end result is that any change more or less - // linear fades away in one second. - throttle_fraction = fraction_avg * max_bandwidth / total_bandwidth; - } - if (throttle_fraction > 1024) - throttle_fraction = 1024; - if (total_bandwidth > max_bandwidth) - { - throttle_fraction *= 0.95; - } - - // Throttle this service if it uses too much bandwidth. - if (service_bandwidth > (max_bandwidth * throttle_fraction / 1024)) - { - return false; // wait - } + // Too much bandwidth is being used, either in total or for this service. + PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + return false; } // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. - { - command_queue_rat command_queue_r(command_queue); - S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size(); - // We can't take the command being processed (command_being_processed) into account without - // introducing relatively long waiting times for some mutex (namely between when the command - // is moved from command_queue to command_being_processed, till it's actually being added to - // mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time - // that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness - // here instead. + S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added; + // We can't take the command being processed (command_being_processed) into account without + // introducing relatively long waiting times for some mutex (namely between when the command + // is moved from command_queue to command_being_processed, till it's actually being added to + // mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time + // that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness + // here instead. - // The maximum number of requests that may be queued in command_queue is equal to the total number of requests - // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus - // the number of already running requests. - reject = pipelined_requests >= max_pipelined_requests_cache; - equal = pipelined_requests == max_pipelined_requests_cache; - increment_threshold = sRequestStarvation; - } + // The maximum number of requests that may be queued in command_queue is equal to the total number of requests + // that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus + // the number of already running requests. + MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests); + reject = pipelined_requests >= max_pipelined_requests_w->count; + equal = pipelined_requests == max_pipelined_requests_w->count; + increment_threshold = starvation; if (increment_threshold && reject) { - if (max_pipelined_requests_cache < 2 * (S32)curl_max_total_concurrent_connections) + if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections && + sTime_40ms > max_pipelined_requests_w->last_increment) { - max_pipelined_requests++; + max_pipelined_requests_w->count++; + max_pipelined_requests_w->last_increment = sTime_40ms; // Immediately take the new threshold into account. reject = !equal; } } + if (reject) + { + PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + } return !reject; } +bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) +{ + if (sNoHTTPBandwidthThrottling) + return false; + + using namespace AICurlPrivate; + + // Truncate the sums to the last second, and get their value. + size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125(); + size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second. + size_t const service_bandwidth = PerService_wat(*per_service)->bandwidth().truncateData(sTime_40ms); // Idem for just this service. + ThrottleFraction_wat throttle_fraction_w(sThrottleFraction); + if (sTime_40ms > throttle_fraction_w->last_add) + { + throttle_fraction_w->average.addData(throttle_fraction_w->fraction, sTime_40ms); + // Only add throttle_fraction_w->fraction once every 40 ms at most. + // It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second. + throttle_fraction_w->last_add = sTime_40ms; + } + double fraction_avg = throttle_fraction_w->average.getAverage(1024.0); // throttle_fraction_w->fraction averaged over the past second, or 1024 if there is no data. + + // Adjust the fraction based on total bandwidth usage. + if (total_bandwidth == 0) + throttle_fraction_w->fraction = 1024; + else + { + // This is the main formula. It can be made plausible by assuming + // an equilibrium where total_bandwidth == max_bandwidth and + // thus fraction == fraction_avg for more than a second. + // + // Then, more bandwidth is being used (for example because another + // service starts downloading). Assuming that all services that use + // a significant portion of the bandwidth, the new service included, + // must be throttled (all using the same bandwidth; note that the + // new service is immediately throttled at the same value), then + // the limit should be reduced linear with the fraction: + // max_bandwidth / total_bandwidth. + // + // For example, let max_bandwidth be 1. Let there be two throttled + // services, each using 0.5 (fraction_avg = 1024/2). Let the new + // service use what it can: also 0.5 - then without reduction the + // total_bandwidth would become 1.5, and fraction would + // become (1024/2) * 1/1.5 = 1024/3: from 2 to 3 services. + // + // In reality, total_bandwidth would rise linear from 1.0 to 1.5 in + // one second if the throttle fraction wasn't changed. However it is + // changed here. The end result is that any change more or less + // linear fades away in one second. + throttle_fraction_w->fraction = llmin(1024., fraction_avg * max_bandwidth / total_bandwidth + 0.5); + } + if (total_bandwidth > max_bandwidth) + { + throttle_fraction_w->fraction *= 0.95; + } + + // Throttle this service if it uses too much bandwidth. + // Warning: this requires max_bandwidth * 1024 to fit in a size_t. + // On 32 bit that means that HTTPThrottleBandwidth must be less than 33554 kbps. + return (service_bandwidth > (max_bandwidth * throttle_fraction_w->fraction / 1024)); +} + diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp index c130938d2..d5032e8d7 100644 --- a/indra/llmessage/llhttpclient.cpp +++ b/indra/llmessage/llhttpclient.cpp @@ -208,7 +208,8 @@ void LLHTTPClient::request( EAllowCompressedReply allow_compression, AIStateMachine* parent, AIStateMachine::state_type new_parent_state, - AIEngine* default_engine) + AIEngine* default_engine, + bool queue_if_too_much_bandwidth_usage) { llassert(responder); @@ -221,7 +222,7 @@ void LLHTTPClient::request( LLURLRequest* req; try { - req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression); + req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage); #ifdef DEBUG_CURLIO req->mCurlEasyRequest.debug(debug); #endif @@ -701,6 +702,11 @@ void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr r request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); } +void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) +{ + request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false); +} + void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index 6dc10080c..05e028f04 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -433,7 +433,8 @@ public: EAllowCompressedReply allow_compression = allow_compressed_reply, AIStateMachine* parent = NULL, /*AIStateMachine::state_type*/ U32 new_parent_state = 0, - AIEngine* default_engine = &gMainThreadEngine); + AIEngine* default_engine = &gMainThreadEngine, + bool queue_if_too_much_bandwidth_usage = true); /** @name non-blocking API */ //@{ @@ -465,6 +466,10 @@ public: static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) { AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); + static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) + { AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + /** Takes ownership of request and deletes it when sent */ static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive); static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive) diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp index c8be0939b..2d4438804 100644 --- a/indra/llmessage/llurlrequest.cpp +++ b/indra/llmessage/llurlrequest.cpp @@ -75,10 +75,15 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action) // This might throw AICurlNoEasyHandle. LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body, - LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression) : + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression, + bool queue_if_too_much_bandwidth_usage) : mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression), mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "") { + if (queue_if_too_much_bandwidth_usage) + { + AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage(); + } } void LLURLRequest::initialize_impl(void) diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h index 1f9c09914..d31cc1918 100644 --- a/indra/llmessage/llurlrequest.h +++ b/indra/llmessage/llurlrequest.h @@ -64,7 +64,9 @@ class LLURLRequest : public AICurlEasyRequestStateMachine { * @param action One of the ERequestAction enumerations. * @param url The url of the request. It should already be encoded. */ - LLURLRequest(ERequestAction action, std::string const& url, Injector* body, LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool no_compression); + LLURLRequest(ERequestAction action, std::string const& url, Injector* body, + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, + bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage); /** * @brief Cached value of responder->getName() as passed to the constructor. diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 61f8f21bc..e40ed23d7 100644 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -175,7 +175,10 @@ HTTPThrottleBandwidth Comment - The bandwidth (in kbit/s) to strive for + The bandwidth (in kbit/s) to strive for. Smaller values might reduce network + congestion (sim ping time, aka avatar responsiveness). Higher values might download + textures and the inventory faster, although in some cases a too high value might + actually slow that down due to serverside throttling. If unsure, choose 2000. Persist 1 Type @@ -4468,7 +4471,7 @@ This should be as low as possible, but too low may break functionality Type U32 Value - 16 + 8 CurlTimeoutDNSLookup diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 272a0aad8..0f1d93c5c 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -600,12 +600,14 @@ void LLInventoryModelBackgroundFetch::bulkFetch() LLViewerRegion* region = gAgent.getRegion(); if (gDisconnected || !region) return; - static LLCachedControl const throttle_bandwidth("HTTPThrottleBandwidth", 2000); - bool const no_bandwidth_throttling = gHippoGridManager->getConnectedGrid()->isAvination(); - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr, throttle_bandwidth, no_bandwidth_throttling)) + if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) { return; // Wait. } + // If AIPerService::wantsMoreHTTPRequestsFor returns true, then it approved ONE request. + // The code below might fire off zero, one or even more than one requests however! + // This object keeps track of that. + AIPerService::Approvement approvement(mPerServicePtr); U32 item_count=0; U32 folder_count=0; @@ -699,14 +701,16 @@ void LLInventoryModelBackgroundFetch::bulkFetch() if (folder_request_body["folders"].size()) { LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); - LLHTTPClient::post(url, folder_request_body, fetcher); + LLHTTPClient::post_nb(url, folder_request_body, fetcher); + approvement.honored(); } if (folder_request_body_lib["folders"].size()) { std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2"); LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); - LLHTTPClient::post(url_lib, folder_request_body_lib, fetcher); + LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher); + approvement.honored(); } } if (item_count) @@ -724,7 +728,8 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body; - LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body)); + LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); + approvement.honored(); } } @@ -740,13 +745,13 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body_lib; - LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body)); + LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); + approvement.honored(); } } } mFetchTimer.reset(); } - else if (isBulkFetchProcessingComplete()) { llinfos << "Inventory fetch completed" << llendl; diff --git a/indra/newview/llstartup.cpp b/indra/newview/llstartup.cpp index d276909d3..455cf3820 100644 --- a/indra/newview/llstartup.cpp +++ b/indra/newview/llstartup.cpp @@ -1008,6 +1008,8 @@ bool idle_startup() LLTrans::setDefaultArg("[GRID_OWNER]", gHippoGridManager->getConnectedGrid()->getGridOwner()); LLScriptEdCore::parseFunctions("lsl_functions_os.xml"); //Singu Note: This appends to the base functions parsed from lsl_functions_sl.xml } + // Avination doesn't want the viewer to do bandwidth throttling (it is done serverside, taking UDP into account too). + AIPerService::setNoHTTPBandwidthThrottling(gHippoGridManager->getConnectedGrid()->isAvination()); // create necessary directories // *FIX: these mkdir's should error check diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index f80bd9539..56a6267b2 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -1272,12 +1272,13 @@ bool LLTextureFetchWorker::doWork(S32 param) } // Let AICurl decide if we can process more HTTP requests at the moment or not. - static const LLCachedControl throttle_bandwidth("HTTPThrottleBandwidth", 2000); - bool const no_bandwidth_throttling = gHippoGridManager->getConnectedGrid()->isAvination(); - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr, throttle_bandwidth, no_bandwidth_throttling)) + if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) { return false ; //wait. } + // If AIPerService::wantsMoreHTTPRequestsFor returns true then it approved ONE request. + // This object keeps track of whether or not that is honored. + AIPerService::Approvement approvement(mPerServicePtr); mFetcher->removeFromNetworkQueue(this, false); @@ -1323,7 +1324,9 @@ bool LLTextureFetchWorker::doWork(S32 param) } LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL, new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true), - headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL); + headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false); + // Now the request was added to the command queue. + approvement.honored(); res = true; } if (!res) diff --git a/indra/newview/lltextureview.cpp b/indra/newview/lltextureview.cpp index 944870e32..41c819b5f 100644 --- a/indra/newview/lltextureview.cpp +++ b/indra/newview/lltextureview.cpp @@ -37,6 +37,7 @@ #include "llimageworker.h" #include "llrender.h" +#include "aicurlperservice.h" #include "llappviewer.h" #include "llselectmgr.h" #include "llviewertexlayer.h" @@ -621,18 +622,15 @@ void LLGLTexMemBar::draw() text_color, LLFontGL::LEFT, LLFontGL::TOP); left += LLFontGL::getFontMonospace()->getWidth(text); - // This bandwidth is averaged over 1 seconds (in kbps). - F32 bandwidth = AICurlInterface::getHTTPBandwidth() / 125.f; // Convert from bytes/s to kbps. - // This is the maximum bandwidth allowed for curl transactions (of any type and averaged per second), - // that is actually used to limit the number of HTTP texture requests (and only those). - // Comparing that with 'bandwidth' is a bit like comparing apples and oranges, but again... who really cares. - static const LLCachedControl max_bandwidth("HTTPThrottleBandwidth", 2000); - color = bandwidth > max_bandwidth ? LLColor4::red : bandwidth > max_bandwidth*.75f ? LLColor4::yellow : text_color; + // This bandwidth is averaged over 1 seconds (in bytes/s). + size_t const bandwidth = AICurlInterface::getHTTPBandwidth(); + size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125(); + color = (bandwidth > max_bandwidth) ? LLColor4::red : ((bandwidth > max_bandwidth * .75f) ? LLColor4::yellow : text_color); color[VALPHA] = text_color[VALPHA]; - text = llformat("BW:%.0f/%.0f", bandwidth, max_bandwidth.get()); + text = llformat("BW:%lu/%lu", bandwidth / 125, max_bandwidth / 125); LLFontGL::getFontMonospace()->renderUTF8(text, 0, left, v_offset + line_height*2, color, LLFontGL::LEFT, LLFontGL::TOP); - + S32 dx1 = 0; if (LLAppViewer::getTextureFetch()->mDebugPause) { diff --git a/indra/newview/llviewercontrol.cpp b/indra/newview/llviewercontrol.cpp index 1fa7a4eca..59958523b 100644 --- a/indra/newview/llviewercontrol.cpp +++ b/indra/newview/llviewercontrol.cpp @@ -84,7 +84,6 @@ #include "aicurl.h" #include "aihttptimeoutpolicy.h" - #ifdef TOGGLE_HACKED_GODLIKE_VIEWER BOOL gHackGodmode = FALSE; #endif @@ -330,6 +329,12 @@ static bool handleBandwidthChanged(const LLSD& newvalue) return true; } +static bool handleHTTPBandwidthChanged(const LLSD& newvalue) +{ + AIPerService::setHTTPThrottleBandwidth((F32) newvalue.asReal()); + return true; +} + static bool handleChatFontSizeChanged(const LLSD& newvalue) { if(gConsole) @@ -666,6 +671,7 @@ void settings_setup_listeners() gSavedSettings.getControl("RenderTreeLODFactor")->getSignal()->connect(boost::bind(&handleTreeLODChanged, _2)); gSavedSettings.getControl("RenderFlexTimeFactor")->getSignal()->connect(boost::bind(&handleFlexLODChanged, _2)); gSavedSettings.getControl("ThrottleBandwidthKBPS")->getSignal()->connect(boost::bind(&handleBandwidthChanged, _2)); + gSavedSettings.getControl("HTTPThrottleBandwidth")->getSignal()->connect(boost::bind(&handleHTTPBandwidthChanged, _2)); gSavedSettings.getControl("RenderGamma")->getSignal()->connect(boost::bind(&handleGammaChanged, _2)); gSavedSettings.getControl("RenderFogRatio")->getSignal()->connect(boost::bind(&handleFogRatioChanged, _2)); gSavedSettings.getControl("RenderMaxPartCount")->getSignal()->connect(boost::bind(&handleMaxPartCountChanged, _2)); diff --git a/indra/newview/llviewerregion.cpp b/indra/newview/llviewerregion.cpp index 6c09d31e4..cfdf5ef43 100644 --- a/indra/newview/llviewerregion.cpp +++ b/indra/newview/llviewerregion.cpp @@ -1597,8 +1597,9 @@ void LLViewerRegion::unpackRegionHandshake() // all of our terrain stuff, by if (compp->getParamsReady()) { - //this line creates frame stalls on region crossing and removing it appears to have no effect - //getLand().dirtyAllPatches(); + // The following line was commented out in http://hg.secondlife.com/viewer-development/commits/448b02f5b56f9e608952c810df5454f83051a992 + // by davep. However, this is needed to see changes in region/estate texture elevation ranges, and to update the terrain textures after terraforming. + getLand().dirtyAllPatches(); } else {