diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index a9c15c5c4..248568d7e 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1270,7 +1270,7 @@ bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); BufferedCurlEasyRequest::BufferedCurlEasyRequest() : - mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mQueueIfTooMuchBandwidthUsage(false) + mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 8f8573ea6..e97a18969 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -287,7 +287,11 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) { if (!mQueuedRequests.empty()) { - multi_handle->add_easy_request(mQueuedRequests.front()); + if (!multi_handle->add_easy_request(mQueuedRequests.front(), true)) + { + // Throttled. + return; + } mQueuedRequests.pop_front(); if (mQueuedRequests.empty()) { @@ -366,3 +370,9 @@ void AIPerService::Approvement::honored(void) } } +void AIPerService::Approvement::not_honored(void) +{ + honored(); + llwarns << "Approvement for has not been honored." << llendl; +} + diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index eab41ad80..5bcc28063 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -114,7 +114,7 @@ class AIPerService { private: typedef std::deque queued_request_type; - int mApprovedRequests; // The number of approved requests by wantsMoreHTTPRequestsFor that were not added to the command queue yet. + int mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue. int mAdded; // Number of active easy handles with this host. queued_request_type mQueuedRequests; // Waiting (throttled) requests. @@ -201,29 +201,30 @@ class AIPerService { // Called when CurlConcurrentConnectionsPerService changes. static void adjust_concurrent_connections(int increment); - // The two following functions are static and have the AIPerService object passed - // as first argument as an AIPerServicePtr because that avoids the need of having - // the AIPerService object locked for the whole duration of the call. - // The functions only lock it when access is required. - - // Returns true if curl can handle another request for this host. - // Should return false if the maximum allowed HTTP bandwidth is reached, or when - // the latency between request and actual delivery becomes too large. - static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service); - // Return true if too much bandwidth is being used. - static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); - - // A helper class to decrement mApprovedRequests after requests approved by wantsMoreHTTPRequestsFor were handled. - class Approvement { + // A helper class to decrement mApprovedRequests after requests approved by approveHTTPRequestFor were handled. + class Approvement : public LLThreadSafeRefCount { private: AIPerServicePtr mPerServicePtr; bool mHonored; public: Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { } - ~Approvement() { honored(); } + ~Approvement() { if (!mHonored) not_honored(); } void honored(void); + void not_honored(void); }; + // The two following functions are static and have the AIPerService object passed + // as first argument as an AIPerServicePtr because that avoids the need of having + // the AIPerService object locked for the whole duration of the call. + // The functions only lock it when access is required. + + // Returns approvement if curl can handle another request for this host. + // Should return NULL if the maximum allowed HTTP bandwidth is reached, or when + // the latency between request and actual delivery becomes too large. + static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service); + // Return true if too much bandwidth is being used. + static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); + private: // Disallow copying. AIPerService(AIPerService const&); diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index de3e6b4b1..90b04ad8f 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -130,11 +130,17 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven // Pause and unpause a connection. CURLcode pause(int bitmask); + // Called if this request should be queued on the curl thread when too much bandwidth is being used. + void setApproved(AIPerService::Approvement* approved) { mApproved = approved; } + + // Returns false when this request should be queued by the curl thread when too much bandwidth is being used. + bool approved(void) const { return mApproved; } + // Called when a request is queued for removal. In that case a race between the actual removal // and revoking of the callbacks is harmless (and happens for the raw non-statemachine version). void remove_queued(void) { mQueuedForRemoval = true; } // In case it's added after being removed. - void add_queued(void) { mQueuedForRemoval = false; } + void add_queued(void) { mQueuedForRemoval = false; if (mApproved) { mApproved->honored(); } } #ifdef DEBUG_CURLIO void debug(bool debug) { if (mDebug) debug_curl_remove_easy(mEasyHandle); if (debug) debug_curl_add_easy(mEasyHandle); mDebug = debug; } @@ -146,6 +152,7 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven mutable char* mErrorBuffer; AIPostFieldPtr mPostField; // This keeps the POSTFIELD data alive for as long as the easy handle exists. bool mQueuedForRemoval; // Set if the easy handle is (probably) added to the multi handle, but is queued for removal. + LLPointer mApproved; // When not set then the curl thread should check bandwidth usage and queue this request if too much is being used. #ifdef DEBUG_CURLIO bool mDebug; #endif @@ -375,9 +382,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { void resetState(void); void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder); - // Called if this request should be queued on the curl thread when too much bandwidth is being used. - void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; } - buffer_ptr_t& getInput(void) { return mInput; } buffer_ptr_t& getOutput(void) { return mOutput; } @@ -419,7 +423,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U32 mRequestTransferedBytes; size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far. AIBufferedCurlEasyRequestEvents* mBufferEventsTarget; - bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used. public: static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()). @@ -456,9 +459,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } - - // Returns true when this request should be queued by the curl thread when too much bandwidth is being used. - bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index cbd2bc784..70c041a1f 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1331,8 +1331,8 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) case cmd_boost: // FIXME: future stuff break; case cmd_add: + multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), false); PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); - multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request())); break; case cmd_remove: PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. @@ -1701,14 +1701,14 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread(). -void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) +bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue) { bool throttled = true; // Default. AIPerServicePtr per_service; { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); per_service = curl_easy_request_w->getPerServicePtr(); - bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + bool too_much_bandwidth = !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); PerService_wat per_service_w(*per_service); if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) { @@ -1731,7 +1731,12 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) llassert(sTotalAdded == mAddedEasyRequests.size()); Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() << "; now processing " << mAddedEasyRequests.size() << " easy handles [running_handles = " << AICurlInterface::Stats::running_handles << "]."); - return; + return true; + } + if (from_queue) + { + // Throttled. Do not add to queue, because it is already in the queue. + return false; } // The request could not be added, we have to queue it. PerService_wat(*per_service)->queue(easy_request); @@ -1739,6 +1744,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; #endif + return true; } CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command) @@ -2608,7 +2614,7 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // running requests (in MultiHandle::mAddedEasyRequests)). // //static -bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) +AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service) { using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; @@ -2684,7 +2690,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) if (reject) { // Too many request for this service already. - return false; + return NULL; } // Throttle on bandwidth usage. @@ -2692,7 +2698,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) { // Too much bandwidth is being used, either in total or for this service. PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. - return false; + return NULL; } // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. @@ -2726,8 +2732,9 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) if (reject) { PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + return NULL; } - return !reject; + return new Approvement(per_service); } bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) diff --git a/indra/llmessage/aicurlthread.h b/indra/llmessage/aicurlthread.h index 9f71b015c..9a4c2b487 100644 --- a/indra/llmessage/aicurlthread.h +++ b/indra/llmessage/aicurlthread.h @@ -59,7 +59,7 @@ class MultiHandle : public CurlMultiHandle ~MultiHandle(); // Add/remove an easy handle to/from a multi session. - void add_easy_request(AICurlEasyRequest const& easy_request); + bool add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue); CURLMcode remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command = false); // Reads/writes available data from a particular socket (non-blocking). diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp index d5032e8d7..dfbc9e249 100644 --- a/indra/llmessage/llhttpclient.cpp +++ b/indra/llmessage/llhttpclient.cpp @@ -201,15 +201,15 @@ void LLHTTPClient::request( LLURLRequest::ERequestAction method, Injector* body_injector, LLHTTPClient::ResponderPtr responder, - AIHTTPHeaders& headers/*,*/ + AIHTTPHeaders& headers, + AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, EDoesAuthentication does_auth, EAllowCompressedReply allow_compression, AIStateMachine* parent, AIStateMachine::state_type new_parent_state, - AIEngine* default_engine, - bool queue_if_too_much_bandwidth_usage) + AIEngine* default_engine) { llassert(responder); @@ -222,7 +222,7 @@ void LLHTTPClient::request( LLURLRequest* req; try { - req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage); + req = new LLURLRequest(method, url, body_injector, responder, headers, approved, keepalive, does_auth, allow_compression); #ifdef DEBUG_CURLIO req->mCurlEasyRequest.debug(debug); #endif @@ -243,22 +243,22 @@ void LLHTTPClient::getByteRange(std::string const& url, S32 offset, S32 bytes, R { headers.addHeader("Range", llformat("bytes=%d-%d", offset, offset + bytes - 1)); } - request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::head(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::get(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::getHeaderOnly(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::get(std::string const& url, LLSD const& query, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) @@ -694,22 +694,22 @@ U32 LLHTTPClient::blockingGetRaw(const std::string& url, std::string& body/*,*/ void LLHTTPClient::put(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_PUT, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply); + request(url, HTTP_PUT, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply); } void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) { - request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); + request(url, HTTP_POST, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); } -void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) +void LLHTTPClient::post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) { - request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false); + request(url, HTTP_POST, new LLSDInjector(body), responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine); } void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); + request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); } void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC_VALUE value, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) @@ -720,33 +720,33 @@ void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC XMLRPC_RequestSetData(xmlrpc_request, value); // XMLRPCInjector takes ownership of xmlrpc_request and will free it when done. // LLURLRequest takes ownership of the XMLRPCInjector object and will free it when done. - request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply); + request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply); } void LLHTTPClient::postRaw(std::string const& url, char const* data, S32 size, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new RawInjector(data, size), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new RawInjector(data, size), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } void LLHTTPClient::postFile(std::string const& url, std::string const& filename, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new FileInjector(filename), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new FileInjector(filename), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } void LLHTTPClient::postFile(std::string const& url, LLUUID const& uuid, LLAssetType::EType asset_type, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } // static void LLHTTPClient::del(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_DELETE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_DELETE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } // static void LLHTTPClient::move(std::string const& url, std::string const& destination, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { headers.addHeader("Destination", destination); - request(url, HTTP_MOVE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_MOVE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index 05e028f04..e376530e5 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -38,6 +38,7 @@ #include "llassettype.h" #include "llhttpstatuscodes.h" #include "aihttpheaders.h" +#include "aicurlperservice.h" class LLUUID; class LLPumpIO; @@ -426,15 +427,15 @@ public: ERequestAction method, Injector* body_injector, ResponderPtr responder, - AIHTTPHeaders& headers/*,*/ + AIHTTPHeaders& headers, + AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive = keep_alive, EDoesAuthentication does_auth = no_does_authentication, EAllowCompressedReply allow_compression = allow_compressed_reply, AIStateMachine* parent = NULL, /*AIStateMachine::state_type*/ U32 new_parent_state = 0, - AIEngine* default_engine = &gMainThreadEngine, - bool queue_if_too_much_bandwidth_usage = true); + AIEngine* default_engine = &gMainThreadEngine); /** @name non-blocking API */ //@{ @@ -466,9 +467,9 @@ public: static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) { AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } - static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); - static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) - { AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); + static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) + { AIHTTPHeaders headers; post_approved(url, body, responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } /** Takes ownership of request and deletes it when sent */ static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive); diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp index 2d4438804..b10b0c370 100644 --- a/indra/llmessage/llurlrequest.cpp +++ b/indra/llmessage/llurlrequest.cpp @@ -75,14 +75,14 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action) // This might throw AICurlNoEasyHandle. LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body, - LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression, - bool queue_if_too_much_bandwidth_usage) : + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved, + bool keepalive, bool is_auth, bool compression) : mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression), mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "") { - if (queue_if_too_much_bandwidth_usage) + if (approved) { - AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage(); + AICurlEasyRequest_wat(*mCurlEasyRequest)->setApproved(approved); } } diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h index d31cc1918..de92156c6 100644 --- a/indra/llmessage/llurlrequest.h +++ b/indra/llmessage/llurlrequest.h @@ -66,7 +66,8 @@ class LLURLRequest : public AICurlEasyRequestStateMachine { */ LLURLRequest(ERequestAction action, std::string const& url, Injector* body, LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, - bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage); + AIPerService::Approvement* approved, + bool keepalive, bool is_auth, bool no_compression); /** * @brief Cached value of responder->getName() as passed to the constructor. diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 0f1d93c5c..dba9c9836 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -204,7 +204,8 @@ void LLInventoryModelBackgroundFetch::backgroundFetch() std::string url = region->getCapability("FetchInventory2"); if (!url.empty()) { - if (!mPerServicePtr) + bool mPerServicePtr_initialized = !!mPerServicePtr; + if (!mPerServicePtr_initialized) { // One time initialization needed for bulkFetch(). std::string servicename = AIPerService::extract_canonical_servicename(url); @@ -212,10 +213,14 @@ void LLInventoryModelBackgroundFetch::backgroundFetch() { llinfos << "Initialized service name for bulk inventory fetching with \"" << servicename << "\"." << llendl; mPerServicePtr = AIPerService::instance(servicename); + mPerServicePtr_initialized = true; } } - bulkFetch(); - return; + if (mPerServicePtr_initialized) + { + bulkFetch(); + return; + } } } @@ -600,30 +605,29 @@ void LLInventoryModelBackgroundFetch::bulkFetch() LLViewerRegion* region = gAgent.getRegion(); if (gDisconnected || !region) return; - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) - { - return; // Wait. - } - // If AIPerService::wantsMoreHTTPRequestsFor returns true, then it approved ONE request. - // The code below might fire off zero, one or even more than one requests however! - // This object keeps track of that. - AIPerService::Approvement approvement(mPerServicePtr); - - U32 item_count=0; - U32 folder_count=0; - U32 max_batch_size=5; + U32 const max_batch_size = 5; U32 sort_order = gSavedSettings.getU32(LLInventoryPanel::DEFAULT_SORT_ORDER) & 0x1; uuid_vec_t recursive_cats; + U32 folder_count=0; + U32 folder_lib_count=0; + U32 item_count=0; + U32 item_lib_count=0; + + // This function can do up to four requests at once. + LLPointer approved_folder; + LLPointer approved_folder_lib; + LLPointer approved_item; + LLPointer approved_item_lib; + LLSD folder_request_body; LLSD folder_request_body_lib; LLSD item_request_body; LLSD item_request_body_lib; - while (!mFetchQueue.empty() - && (item_count + folder_count) < max_batch_size) + while (!mFetchQueue.empty()) { const FetchQueueInfo& fetch_info = mFetchQueue.front(); if (fetch_info.mIsCategory) @@ -645,10 +649,27 @@ void LLInventoryModelBackgroundFetch::bulkFetch() folder_sd["fetch_items"] = (LLSD::Boolean)TRUE; if (ALEXANDRIA_LINDEN_ID == cat->getOwnerID()) + { + if (folder_lib_count == max_batch_size || + (folder_lib_count == 0 && + !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } folder_request_body_lib["folders"].append(folder_sd); + ++folder_lib_count; + } else + { + if (folder_count == max_batch_size || + (folder_count == 0 && + !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } folder_request_body["folders"].append(folder_sd); - folder_count++; + ++folder_count; + } } // May already have this folder, but append child folders to list. if (fetch_info.mRecursive) @@ -678,77 +699,69 @@ void LLInventoryModelBackgroundFetch::bulkFetch() item_sd["item_id"] = itemp->getUUID(); if (itemp->getPermissions().getOwner() == gAgent.getID()) { + if (item_count == max_batch_size || + (item_count == 0 && + !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } item_request_body.append(item_sd); + ++item_count; } else { + if (item_lib_count == max_batch_size || + (item_lib_count == 0 && + !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } item_request_body_lib.append(item_sd); + ++item_lib_count; } - //itemp->fetchFromServer(); - item_count++; } } mFetchQueue.pop_front(); } - if (item_count + folder_count > 0) + if (item_count + folder_count + item_lib_count + folder_lib_count > 0) { if (folder_count) { std::string url = region->getCapability("FetchInventoryDescendents2"); - mFetchCount++; - if (folder_request_body["folders"].size()) - { - LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); - LLHTTPClient::post_nb(url, folder_request_body, fetcher); - approvement.honored(); - } - if (folder_request_body_lib["folders"].size()) - { - std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2"); - - LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); - LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher); - approvement.honored(); - } + llassert(!url.empty()); + ++mFetchCount; + LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); + LLHTTPClient::post_approved(url, folder_request_body, fetcher, approved_folder); + } + if (folder_lib_count) + { + std::string url = gAgent.getRegion()->getCapability("FetchLibDescendents2"); + llassert(!url.empty()); + ++mFetchCount; + LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); + LLHTTPClient::post_approved(url, folder_request_body_lib, fetcher, approved_folder_lib); } if (item_count) { - std::string url; - - if (item_request_body.size()) - { - mFetchCount++; - url = region->getCapability("FetchInventory2"); - llassert(!url.empty()); - if (!url.empty()) - { - LLSD body; - body["agent_id"] = gAgent.getID(); - body["items"] = item_request_body; - - LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); - approvement.honored(); - } - } - - if (item_request_body_lib.size()) - { - mFetchCount++; - - url = region->getCapability("FetchLib2"); - llassert(!url.empty()); - if (!url.empty()) - { - LLSD body; - body["agent_id"] = gAgent.getID(); - body["items"] = item_request_body_lib; - - LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); - approvement.honored(); - } - } + std::string url = region->getCapability("FetchInventory2"); + llassert(!url.empty()); + ++mFetchCount; + LLSD body; + body["agent_id"] = gAgent.getID(); + body["items"] = item_request_body; + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item); + } + if (item_lib_count) + { + std::string url = region->getCapability("FetchLib2"); + llassert(!url.empty()); + ++mFetchCount; + LLSD body; + body["agent_id"] = gAgent.getID(); + body["items"] = item_request_body_lib; + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item_lib); } mFetchTimer.reset(); } diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index 56a6267b2..0d58442c8 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -1272,13 +1272,14 @@ bool LLTextureFetchWorker::doWork(S32 param) } // Let AICurl decide if we can process more HTTP requests at the moment or not. - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) + + // AIPerService::approveHTTPRequestFor returns approvement for ONE request. + // This object keeps track of whether or not that is honored. + LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); + if (!approved) { return false ; //wait. } - // If AIPerService::wantsMoreHTTPRequestsFor returns true then it approved ONE request. - // This object keeps track of whether or not that is honored. - AIPerService::Approvement approvement(mPerServicePtr); mFetcher->removeFromNetworkQueue(this, false); @@ -1324,9 +1325,7 @@ bool LLTextureFetchWorker::doWork(S32 param) } LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL, new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true), - headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false); - // Now the request was added to the command queue. - approvement.honored(); + headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL); res = true; } if (!res)