diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 06c7f0ab4..939bd0280 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1269,7 +1269,8 @@ LLMutex BufferedCurlEasyRequest::sResponderCallbackMutex; bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); -BufferedCurlEasyRequest::BufferedCurlEasyRequest() : mRequestTransferedBytes(0), mTotalRawBytes(0), mBufferEventsTarget(NULL), mStatus(HTTP_INTERNAL_ERROR_OTHER) +BufferedCurlEasyRequest::BufferedCurlEasyRequest() : + mRequestTransferedBytes(0), mTotalRawBytes(0), mBufferEventsTarget(NULL), mStatus(HTTP_INTERNAL_ERROR_OTHER), mQueueIfTooMuchBandwidthUsage(false) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 61afd2cc7..6b33b5c1b 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -75,11 +75,11 @@ typedef boost::intrusive_ptr instance_map_type; @@ -136,7 +136,7 @@ class AIPerService { static U64 sLastTime_sMaxPipelinedRequests_increment; // Last time that sMaxPipelinedRequests was incremented. static U64 sLastTime_sMaxPipelinedRequests_decrement; // Last time that sMaxPipelinedRequests was decremented. static U64 sLastTime_ThrottleFractionAverage_add; // Last time that sThrottleFraction was added to sThrottleFractionAverage. - static size_t sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth. + static AIThreadSafeSimpleDC sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth. static AIAverage sThrottleFractionAverage; // Average of sThrottleFraction over 25 * 40ms = 1 second. static size_t sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s). static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling. @@ -173,7 +173,7 @@ class AIPerService { // the latency between request and actual delivery becomes too large. static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service); // Return true if too much bandwidth is being used. - static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth_ptr); + static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth_ptr); // Accessor for when curl_max_total_concurrent_connections changes. static LLAtomicS32& maxPipelinedRequests(void) { return sMaxPipelinedRequests; } diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index 57e6d9210..de3e6b4b1 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -375,6 +375,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { void resetState(void); void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder); + // Called if this request should be queued on the curl thread when too much bandwidth is being used. + void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; } + buffer_ptr_t& getInput(void) { return mInput; } buffer_ptr_t& getOutput(void) { return mOutput; } @@ -416,6 +419,7 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U32 mRequestTransferedBytes; size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far. AIBufferedCurlEasyRequestEvents* mBufferEventsTarget; + bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used. public: static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()). @@ -452,6 +456,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } + + // Returns true when this request should be queued by the curl thread when too much bandwidth is being used. + bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 2a0941db8..fcffc6d39 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1708,8 +1708,11 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); per_service = curl_easy_request_w->getPerServicePtr(); + bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && + AIPerService::checkBandwidthUsage(get_clock_count() * HTTPTimeout::sClockWidth_40ms, + PerServiceRequestQueue_wat(*per_service)->bandwidth()); PerServiceRequestQueue_wat per_service_w(*per_service); - if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) + if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) { curl_easy_request_w->set_timeout_opts(); if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK) @@ -2578,7 +2581,7 @@ U64 AIPerService::sLastTime_sMaxPipelinedRequests_increment = 0; U64 AIPerService::sLastTime_sMaxPipelinedRequests_decrement = 0; LLAtomicS32 AIPerService::sMaxPipelinedRequests(32); U64 AIPerService::sLastTime_ThrottleFractionAverage_add = 0; -size_t AIPerService::sThrottleFraction = 1024; +AIThreadSafeSimpleDC AIPerService::sThrottleFraction(1024); AIAverage AIPerService::sThrottleFractionAverage(25); size_t AIPerService::sHTTPThrottleBandwidth125 = 250000; bool AIPerService::sNoHTTPBandwidthThrottling; @@ -2678,7 +2681,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) } // Throttle on bandwidth usage. - if (checkBandwidthUsage(sTime_40ms, http_bandwidth_ptr)) + if (checkBandwidthUsage(sTime_40ms, *http_bandwidth_ptr)) { // Too much bandwidth is being used, either in total or for this service. return false; @@ -2717,7 +2720,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) return !reject; } -bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth_ptr) +bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth) { if (sNoHTTPBandwidthThrottling) return false; @@ -2727,19 +2730,21 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth // Truncate the sums to the last second, and get their value. size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125(); size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second. - size_t const service_bandwidth = http_bandwidth_ptr->truncateData(sTime_40ms); // Idem for just this service. + size_t const service_bandwidth = http_bandwidth.truncateData(sTime_40ms); // Idem for just this service. + AIAccess throttle_fraction_w(sThrottleFraction); + // Note that sLastTime_ThrottleFractionAverage_add is protected by the lock on sThrottleFraction... if (sTime_40ms > sLastTime_ThrottleFractionAverage_add) { - sThrottleFractionAverage.addData(sThrottleFraction, sTime_40ms); + sThrottleFractionAverage.addData(*throttle_fraction_w, sTime_40ms); // Only add sThrottleFraction once every 40 ms at most. // It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second. sLastTime_ThrottleFractionAverage_add = sTime_40ms; } - double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data. + double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data. // Adjust sThrottleFraction based on total bandwidth usage. if (total_bandwidth == 0) - sThrottleFraction = 1024; + *throttle_fraction_w = 1024; else { // This is the main formula. It can be made plausible by assuming @@ -2764,16 +2769,16 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth // one second if the throttle fraction wasn't changed. However it is // changed here. The end result is that any change more or less // linear fades away in one second. - sThrottleFraction = fraction_avg * max_bandwidth / total_bandwidth; + *throttle_fraction_w = fraction_avg * max_bandwidth / total_bandwidth + 0.5; } - if (sThrottleFraction > 1024) - sThrottleFraction = 1024; + if (*throttle_fraction_w > 1024) + *throttle_fraction_w = 1024; if (total_bandwidth > max_bandwidth) { - sThrottleFraction *= 0.95; + *throttle_fraction_w *= 0.95; } // Throttle this service if it uses too much bandwidth. - return (service_bandwidth > (max_bandwidth * sThrottleFraction / 1024)); + return (service_bandwidth > (max_bandwidth * *throttle_fraction_w / 1024)); } diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp index c130938d2..d5032e8d7 100644 --- a/indra/llmessage/llhttpclient.cpp +++ b/indra/llmessage/llhttpclient.cpp @@ -208,7 +208,8 @@ void LLHTTPClient::request( EAllowCompressedReply allow_compression, AIStateMachine* parent, AIStateMachine::state_type new_parent_state, - AIEngine* default_engine) + AIEngine* default_engine, + bool queue_if_too_much_bandwidth_usage) { llassert(responder); @@ -221,7 +222,7 @@ void LLHTTPClient::request( LLURLRequest* req; try { - req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression); + req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage); #ifdef DEBUG_CURLIO req->mCurlEasyRequest.debug(debug); #endif @@ -701,6 +702,11 @@ void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr r request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); } +void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) +{ + request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false); +} + void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index 6dc10080c..05e028f04 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -433,7 +433,8 @@ public: EAllowCompressedReply allow_compression = allow_compressed_reply, AIStateMachine* parent = NULL, /*AIStateMachine::state_type*/ U32 new_parent_state = 0, - AIEngine* default_engine = &gMainThreadEngine); + AIEngine* default_engine = &gMainThreadEngine, + bool queue_if_too_much_bandwidth_usage = true); /** @name non-blocking API */ //@{ @@ -465,6 +466,10 @@ public: static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) { AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); + static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) + { AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + /** Takes ownership of request and deletes it when sent */ static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive); static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive) diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp index c8be0939b..2d4438804 100644 --- a/indra/llmessage/llurlrequest.cpp +++ b/indra/llmessage/llurlrequest.cpp @@ -75,10 +75,15 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action) // This might throw AICurlNoEasyHandle. LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body, - LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression) : + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression, + bool queue_if_too_much_bandwidth_usage) : mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression), mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "") { + if (queue_if_too_much_bandwidth_usage) + { + AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage(); + } } void LLURLRequest::initialize_impl(void) diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h index 1f9c09914..d31cc1918 100644 --- a/indra/llmessage/llurlrequest.h +++ b/indra/llmessage/llurlrequest.h @@ -64,7 +64,9 @@ class LLURLRequest : public AICurlEasyRequestStateMachine { * @param action One of the ERequestAction enumerations. * @param url The url of the request. It should already be encoded. */ - LLURLRequest(ERequestAction action, std::string const& url, Injector* body, LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool no_compression); + LLURLRequest(ERequestAction action, std::string const& url, Injector* body, + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, + bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage); /** * @brief Cached value of responder->getName() as passed to the constructor. diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 1400b7f14..ebeaba505 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -697,14 +697,14 @@ void LLInventoryModelBackgroundFetch::bulkFetch() if (folder_request_body["folders"].size()) { LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); - LLHTTPClient::post(url, folder_request_body, fetcher); + LLHTTPClient::post_nb(url, folder_request_body, fetcher); } if (folder_request_body_lib["folders"].size()) { std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2"); LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); - LLHTTPClient::post(url_lib, folder_request_body_lib, fetcher); + LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher); } } if (item_count) @@ -722,7 +722,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body; - LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body)); + LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); } } @@ -738,13 +738,12 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body_lib; - LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body)); + LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); } } } mFetchTimer.reset(); } - else if (isBulkFetchProcessingComplete()) { llinfos << "Inventory fetch completed" << llendl; diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index 317915050..3206af56f 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -1321,7 +1321,7 @@ bool LLTextureFetchWorker::doWork(S32 param) } LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL, new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true), - headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL); + headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false); res = true; } if (!res)