From 3d63f9cd24c8e4ee09a877a50ca03cb552c63bcd Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Sat, 11 May 2013 16:04:34 +0200 Subject: [PATCH 1/5] WIP on inventory background fetch --- indra/newview/llinventorymodelbackgroundfetch.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 0f1d93c5c..047c6cd5d 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -204,7 +204,8 @@ void LLInventoryModelBackgroundFetch::backgroundFetch() std::string url = region->getCapability("FetchInventory2"); if (!url.empty()) { - if (!mPerServicePtr) + bool mPerServicePtr_initialized = !!mPerServicePtr; + if (!mPerServicePtr_initialized) { // One time initialization needed for bulkFetch(). std::string servicename = AIPerService::extract_canonical_servicename(url); @@ -212,10 +213,14 @@ void LLInventoryModelBackgroundFetch::backgroundFetch() { llinfos << "Initialized service name for bulk inventory fetching with \"" << servicename << "\"." << llendl; mPerServicePtr = AIPerService::instance(servicename); + mPerServicePtr_initialized = true; } } - bulkFetch(); - return; + if (mPerServicePtr_initialized) + { + bulkFetch(); + return; + } } } @@ -698,6 +703,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { std::string url = region->getCapability("FetchInventoryDescendents2"); mFetchCount++; + llassert(!url.empty()); if (folder_request_body["folders"].size()) { LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); From 929badb110f04982cd85695db714b59fdc5eb5fd Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Sun, 12 May 2013 04:19:44 +0200 Subject: [PATCH 2/5] Let statemachine honor approvements. The inventory bulk fetch is not thread-safe, so the it doesn't start right away, causing the approvement not to be honored upon return from post_approved (formerly post_nb). This patch renames wantsMoreHTTPReqestsFor to approveHTTPRequestFor, and has it return NULL or a AIPerService::Approvement object. The latter is now passed to the CurlEasyHandle object instead of just a boolean mQueueIfTooMuchBandwidthUsage, and then the Approvement is honored by the state machine right after the request is actually added to the command queue. This should avoid a flood of inventory requests in the case approveHTTPRequestFor is called multiple times before the main thread adds the requests to the command queue. I don't think that actually ever happens, but I added debug code (to find some problem) that is so damn strictly checking everything that I need to be this precise in order to do that testing. --- indra/llmessage/aicurl.cpp | 2 +- indra/llmessage/aicurlperservice.cpp | 6 +++ indra/llmessage/aicurlperservice.h | 33 ++++++++-------- indra/llmessage/aicurlprivate.h | 16 ++++---- indra/llmessage/aicurlthread.cpp | 13 ++++--- indra/llmessage/llhttpclient.cpp | 38 +++++++++---------- indra/llmessage/llhttpclient.h | 13 ++++--- indra/llmessage/llurlrequest.cpp | 8 ++-- indra/llmessage/llurlrequest.h | 3 +- .../llinventorymodelbackgroundfetch.cpp | 22 +++++------ indra/newview/lltexturefetch.cpp | 13 +++---- 11 files changed, 86 insertions(+), 81 deletions(-) diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index a9c15c5c4..248568d7e 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1270,7 +1270,7 @@ bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); BufferedCurlEasyRequest::BufferedCurlEasyRequest() : - mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mQueueIfTooMuchBandwidthUsage(false) + mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 8f8573ea6..8cf62bc2d 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -366,3 +366,9 @@ void AIPerService::Approvement::honored(void) } } +void AIPerService::Approvement::not_honored(void) +{ + honored(); + llwarns << "Approvement for has not been honored." << llendl; +} + diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index eab41ad80..5bcc28063 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -114,7 +114,7 @@ class AIPerService { private: typedef std::deque queued_request_type; - int mApprovedRequests; // The number of approved requests by wantsMoreHTTPRequestsFor that were not added to the command queue yet. + int mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue. int mAdded; // Number of active easy handles with this host. queued_request_type mQueuedRequests; // Waiting (throttled) requests. @@ -201,29 +201,30 @@ class AIPerService { // Called when CurlConcurrentConnectionsPerService changes. static void adjust_concurrent_connections(int increment); - // The two following functions are static and have the AIPerService object passed - // as first argument as an AIPerServicePtr because that avoids the need of having - // the AIPerService object locked for the whole duration of the call. - // The functions only lock it when access is required. - - // Returns true if curl can handle another request for this host. - // Should return false if the maximum allowed HTTP bandwidth is reached, or when - // the latency between request and actual delivery becomes too large. - static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service); - // Return true if too much bandwidth is being used. - static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); - - // A helper class to decrement mApprovedRequests after requests approved by wantsMoreHTTPRequestsFor were handled. - class Approvement { + // A helper class to decrement mApprovedRequests after requests approved by approveHTTPRequestFor were handled. + class Approvement : public LLThreadSafeRefCount { private: AIPerServicePtr mPerServicePtr; bool mHonored; public: Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { } - ~Approvement() { honored(); } + ~Approvement() { if (!mHonored) not_honored(); } void honored(void); + void not_honored(void); }; + // The two following functions are static and have the AIPerService object passed + // as first argument as an AIPerServicePtr because that avoids the need of having + // the AIPerService object locked for the whole duration of the call. + // The functions only lock it when access is required. + + // Returns approvement if curl can handle another request for this host. + // Should return NULL if the maximum allowed HTTP bandwidth is reached, or when + // the latency between request and actual delivery becomes too large. + static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service); + // Return true if too much bandwidth is being used. + static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); + private: // Disallow copying. AIPerService(AIPerService const&); diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index de3e6b4b1..db174445a 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -130,11 +130,17 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven // Pause and unpause a connection. CURLcode pause(int bitmask); + // Called if this request should be queued on the curl thread when too much bandwidth is being used. + void setApproved(AIPerService::Approvement* approved) { mApproved = approved; } + + // Returns false when this request should be queued by the curl thread when too much bandwidth is being used. + bool approved(void) const { return mApproved; } + // Called when a request is queued for removal. In that case a race between the actual removal // and revoking of the callbacks is harmless (and happens for the raw non-statemachine version). void remove_queued(void) { mQueuedForRemoval = true; } // In case it's added after being removed. - void add_queued(void) { mQueuedForRemoval = false; } + void add_queued(void) { mQueuedForRemoval = false; if (mApproved) { mApproved->honored(); mApproved = NULL; } } #ifdef DEBUG_CURLIO void debug(bool debug) { if (mDebug) debug_curl_remove_easy(mEasyHandle); if (debug) debug_curl_add_easy(mEasyHandle); mDebug = debug; } @@ -146,6 +152,7 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven mutable char* mErrorBuffer; AIPostFieldPtr mPostField; // This keeps the POSTFIELD data alive for as long as the easy handle exists. bool mQueuedForRemoval; // Set if the easy handle is (probably) added to the multi handle, but is queued for removal. + LLPointer mApproved; // When not set then the curl thread should check bandwidth usage and queue this request if too much is being used. #ifdef DEBUG_CURLIO bool mDebug; #endif @@ -375,9 +382,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { void resetState(void); void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder); - // Called if this request should be queued on the curl thread when too much bandwidth is being used. - void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; } - buffer_ptr_t& getInput(void) { return mInput; } buffer_ptr_t& getOutput(void) { return mOutput; } @@ -419,7 +423,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U32 mRequestTransferedBytes; size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far. AIBufferedCurlEasyRequestEvents* mBufferEventsTarget; - bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used. public: static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()). @@ -456,9 +459,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } - - // Returns true when this request should be queued by the curl thread when too much bandwidth is being used. - bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index cbd2bc784..e0e89d429 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1331,8 +1331,8 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) case cmd_boost: // FIXME: future stuff break; case cmd_add: - PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request())); + PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); break; case cmd_remove: PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. @@ -1708,7 +1708,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); per_service = curl_easy_request_w->getPerServicePtr(); - bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + bool too_much_bandwidth = !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); PerService_wat per_service_w(*per_service); if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) { @@ -2608,7 +2608,7 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // running requests (in MultiHandle::mAddedEasyRequests)). // //static -bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) +AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service) { using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; @@ -2684,7 +2684,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) if (reject) { // Too many request for this service already. - return false; + return NULL; } // Throttle on bandwidth usage. @@ -2692,7 +2692,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) { // Too much bandwidth is being used, either in total or for this service. PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. - return false; + return NULL; } // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. @@ -2726,8 +2726,9 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service) if (reject) { PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + return NULL; } - return !reject; + return new Approvement(per_service); } bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) diff --git a/indra/llmessage/llhttpclient.cpp b/indra/llmessage/llhttpclient.cpp index d5032e8d7..dfbc9e249 100644 --- a/indra/llmessage/llhttpclient.cpp +++ b/indra/llmessage/llhttpclient.cpp @@ -201,15 +201,15 @@ void LLHTTPClient::request( LLURLRequest::ERequestAction method, Injector* body_injector, LLHTTPClient::ResponderPtr responder, - AIHTTPHeaders& headers/*,*/ + AIHTTPHeaders& headers, + AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, EDoesAuthentication does_auth, EAllowCompressedReply allow_compression, AIStateMachine* parent, AIStateMachine::state_type new_parent_state, - AIEngine* default_engine, - bool queue_if_too_much_bandwidth_usage) + AIEngine* default_engine) { llassert(responder); @@ -222,7 +222,7 @@ void LLHTTPClient::request( LLURLRequest* req; try { - req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage); + req = new LLURLRequest(method, url, body_injector, responder, headers, approved, keepalive, does_auth, allow_compression); #ifdef DEBUG_CURLIO req->mCurlEasyRequest.debug(debug); #endif @@ -243,22 +243,22 @@ void LLHTTPClient::getByteRange(std::string const& url, S32 offset, S32 bytes, R { headers.addHeader("Range", llformat("bytes=%d-%d", offset, offset + bytes - 1)); } - request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::head(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::get(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::getHeaderOnly(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } void LLHTTPClient::get(std::string const& url, LLSD const& query, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) @@ -694,22 +694,22 @@ U32 LLHTTPClient::blockingGetRaw(const std::string& url, std::string& body/*,*/ void LLHTTPClient::put(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_PUT, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply); + request(url, HTTP_PUT, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply); } void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) { - request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); + request(url, HTTP_POST, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state); } -void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) +void LLHTTPClient::post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state) { - request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false); + request(url, HTTP_POST, new LLSDInjector(body), responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine); } void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); + request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply); } void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC_VALUE value, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) @@ -720,33 +720,33 @@ void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC XMLRPC_RequestSetData(xmlrpc_request, value); // XMLRPCInjector takes ownership of xmlrpc_request and will free it when done. // LLURLRequest takes ownership of the XMLRPCInjector object and will free it when done. - request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply); + request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply); } void LLHTTPClient::postRaw(std::string const& url, char const* data, S32 size, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new RawInjector(data, size), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new RawInjector(data, size), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } void LLHTTPClient::postFile(std::string const& url, std::string const& filename, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new FileInjector(filename), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new FileInjector(filename), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } void LLHTTPClient::postFile(std::string const& url, LLUUID const& uuid, LLAssetType::EType asset_type, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive) { - request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); + request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive); } // static void LLHTTPClient::del(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { - request(url, HTTP_DELETE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_DELETE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } // static void LLHTTPClient::move(std::string const& url, std::string const& destination, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug)) { headers.addHeader("Destination", destination); - request(url, HTTP_MOVE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug)); + request(url, HTTP_MOVE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug)); } diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index 05e028f04..e376530e5 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -38,6 +38,7 @@ #include "llassettype.h" #include "llhttpstatuscodes.h" #include "aihttpheaders.h" +#include "aicurlperservice.h" class LLUUID; class LLPumpIO; @@ -426,15 +427,15 @@ public: ERequestAction method, Injector* body_injector, ResponderPtr responder, - AIHTTPHeaders& headers/*,*/ + AIHTTPHeaders& headers, + AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive = keep_alive, EDoesAuthentication does_auth = no_does_authentication, EAllowCompressedReply allow_compression = allow_compressed_reply, AIStateMachine* parent = NULL, /*AIStateMachine::state_type*/ U32 new_parent_state = 0, - AIEngine* default_engine = &gMainThreadEngine, - bool queue_if_too_much_bandwidth_usage = true); + AIEngine* default_engine = &gMainThreadEngine); /** @name non-blocking API */ //@{ @@ -466,9 +467,9 @@ public: static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) { AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } - static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); - static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) - { AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } + static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0); + static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0) + { AIHTTPHeaders headers; post_approved(url, body, responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); } /** Takes ownership of request and deletes it when sent */ static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive); diff --git a/indra/llmessage/llurlrequest.cpp b/indra/llmessage/llurlrequest.cpp index 2d4438804..b10b0c370 100644 --- a/indra/llmessage/llurlrequest.cpp +++ b/indra/llmessage/llurlrequest.cpp @@ -75,14 +75,14 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action) // This might throw AICurlNoEasyHandle. LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body, - LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression, - bool queue_if_too_much_bandwidth_usage) : + LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved, + bool keepalive, bool is_auth, bool compression) : mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression), mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "") { - if (queue_if_too_much_bandwidth_usage) + if (approved) { - AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage(); + AICurlEasyRequest_wat(*mCurlEasyRequest)->setApproved(approved); } } diff --git a/indra/llmessage/llurlrequest.h b/indra/llmessage/llurlrequest.h index d31cc1918..de92156c6 100644 --- a/indra/llmessage/llurlrequest.h +++ b/indra/llmessage/llurlrequest.h @@ -66,7 +66,8 @@ class LLURLRequest : public AICurlEasyRequestStateMachine { */ LLURLRequest(ERequestAction action, std::string const& url, Injector* body, LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, - bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage); + AIPerService::Approvement* approved, + bool keepalive, bool is_auth, bool no_compression); /** * @brief Cached value of responder->getName() as passed to the constructor. diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 047c6cd5d..0e26dabf5 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -605,14 +605,14 @@ void LLInventoryModelBackgroundFetch::bulkFetch() LLViewerRegion* region = gAgent.getRegion(); if (gDisconnected || !region) return; - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) + // AIPerService::approveHTTPRequestFor returns approvement for ONE request. + // The code below might fire off zero, one or even more than one requests however! + // This object keeps track of that. + LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); + if (!approved) { return; // Wait. } - // If AIPerService::wantsMoreHTTPRequestsFor returns true, then it approved ONE request. - // The code below might fire off zero, one or even more than one requests however! - // This object keeps track of that. - AIPerService::Approvement approvement(mPerServicePtr); U32 item_count=0; U32 folder_count=0; @@ -707,16 +707,14 @@ void LLInventoryModelBackgroundFetch::bulkFetch() if (folder_request_body["folders"].size()) { LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); - LLHTTPClient::post_nb(url, folder_request_body, fetcher); - approvement.honored(); + LLHTTPClient::post_approved(url, folder_request_body, fetcher, approved); } if (folder_request_body_lib["folders"].size()) { std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2"); LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); - LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher); - approvement.honored(); + LLHTTPClient::post_approved(url_lib, folder_request_body_lib, fetcher, approved); } } if (item_count) @@ -734,8 +732,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body; - LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); - approvement.honored(); + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved); } } @@ -751,8 +748,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() body["agent_id"] = gAgent.getID(); body["items"] = item_request_body_lib; - LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body)); - approvement.honored(); + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved); } } } diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index 56a6267b2..0d58442c8 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -1272,13 +1272,14 @@ bool LLTextureFetchWorker::doWork(S32 param) } // Let AICurl decide if we can process more HTTP requests at the moment or not. - if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr)) + + // AIPerService::approveHTTPRequestFor returns approvement for ONE request. + // This object keeps track of whether or not that is honored. + LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); + if (!approved) { return false ; //wait. } - // If AIPerService::wantsMoreHTTPRequestsFor returns true then it approved ONE request. - // This object keeps track of whether or not that is honored. - AIPerService::Approvement approvement(mPerServicePtr); mFetcher->removeFromNetworkQueue(this, false); @@ -1324,9 +1325,7 @@ bool LLTextureFetchWorker::doWork(S32 param) } LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL, new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true), - headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false); - // Now the request was added to the command queue. - approvement.honored(); + headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL); res = true; } if (!res) From 80c8eaab2ae5b1a1fb3fb95fc8f5319f4018c22f Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Sun, 12 May 2013 16:51:48 +0200 Subject: [PATCH 3/5] Ask approvement for each HTTP request. The old code could do up till four requests with only one approvement. Now we just start to assemble the four types of requests until either we can get approvement for one, or one of them gets too large. This way we still request everything in the same order, and at LEAST as many per call as before, assuming we get the approvement of course. The result should actually be faster because now we will request up to 5 folders or items per capability, and not spread those 5 out over 2 to 4 capability requests. --- .../llinventorymodelbackgroundfetch.cpp | 137 ++++++++++-------- 1 file changed, 74 insertions(+), 63 deletions(-) diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index 0e26dabf5..dba9c9836 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -605,30 +605,29 @@ void LLInventoryModelBackgroundFetch::bulkFetch() LLViewerRegion* region = gAgent.getRegion(); if (gDisconnected || !region) return; - // AIPerService::approveHTTPRequestFor returns approvement for ONE request. - // The code below might fire off zero, one or even more than one requests however! - // This object keeps track of that. - LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); - if (!approved) - { - return; // Wait. - } - - U32 item_count=0; - U32 folder_count=0; - U32 max_batch_size=5; + U32 const max_batch_size = 5; U32 sort_order = gSavedSettings.getU32(LLInventoryPanel::DEFAULT_SORT_ORDER) & 0x1; uuid_vec_t recursive_cats; + U32 folder_count=0; + U32 folder_lib_count=0; + U32 item_count=0; + U32 item_lib_count=0; + + // This function can do up to four requests at once. + LLPointer approved_folder; + LLPointer approved_folder_lib; + LLPointer approved_item; + LLPointer approved_item_lib; + LLSD folder_request_body; LLSD folder_request_body_lib; LLSD item_request_body; LLSD item_request_body_lib; - while (!mFetchQueue.empty() - && (item_count + folder_count) < max_batch_size) + while (!mFetchQueue.empty()) { const FetchQueueInfo& fetch_info = mFetchQueue.front(); if (fetch_info.mIsCategory) @@ -650,10 +649,27 @@ void LLInventoryModelBackgroundFetch::bulkFetch() folder_sd["fetch_items"] = (LLSD::Boolean)TRUE; if (ALEXANDRIA_LINDEN_ID == cat->getOwnerID()) + { + if (folder_lib_count == max_batch_size || + (folder_lib_count == 0 && + !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } folder_request_body_lib["folders"].append(folder_sd); + ++folder_lib_count; + } else + { + if (folder_count == max_batch_size || + (folder_count == 0 && + !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } folder_request_body["folders"].append(folder_sd); - folder_count++; + ++folder_count; + } } // May already have this folder, but append child folders to list. if (fetch_info.mRecursive) @@ -683,74 +699,69 @@ void LLInventoryModelBackgroundFetch::bulkFetch() item_sd["item_id"] = itemp->getUUID(); if (itemp->getPermissions().getOwner() == gAgent.getID()) { + if (item_count == max_batch_size || + (item_count == 0 && + !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } item_request_body.append(item_sd); + ++item_count; } else { + if (item_lib_count == max_batch_size || + (item_lib_count == 0 && + !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + { + break; + } item_request_body_lib.append(item_sd); + ++item_lib_count; } - //itemp->fetchFromServer(); - item_count++; } } mFetchQueue.pop_front(); } - if (item_count + folder_count > 0) + if (item_count + folder_count + item_lib_count + folder_lib_count > 0) { if (folder_count) { std::string url = region->getCapability("FetchInventoryDescendents2"); - mFetchCount++; llassert(!url.empty()); - if (folder_request_body["folders"].size()) - { - LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); - LLHTTPClient::post_approved(url, folder_request_body, fetcher, approved); - } - if (folder_request_body_lib["folders"].size()) - { - std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2"); - - LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); - LLHTTPClient::post_approved(url_lib, folder_request_body_lib, fetcher, approved); - } + ++mFetchCount; + LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats); + LLHTTPClient::post_approved(url, folder_request_body, fetcher, approved_folder); + } + if (folder_lib_count) + { + std::string url = gAgent.getRegion()->getCapability("FetchLibDescendents2"); + llassert(!url.empty()); + ++mFetchCount; + LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats); + LLHTTPClient::post_approved(url, folder_request_body_lib, fetcher, approved_folder_lib); } if (item_count) { - std::string url; - - if (item_request_body.size()) - { - mFetchCount++; - url = region->getCapability("FetchInventory2"); - llassert(!url.empty()); - if (!url.empty()) - { - LLSD body; - body["agent_id"] = gAgent.getID(); - body["items"] = item_request_body; - - LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved); - } - } - - if (item_request_body_lib.size()) - { - mFetchCount++; - - url = region->getCapability("FetchLib2"); - llassert(!url.empty()); - if (!url.empty()) - { - LLSD body; - body["agent_id"] = gAgent.getID(); - body["items"] = item_request_body_lib; - - LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved); - } - } + std::string url = region->getCapability("FetchInventory2"); + llassert(!url.empty()); + ++mFetchCount; + LLSD body; + body["agent_id"] = gAgent.getID(); + body["items"] = item_request_body; + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item); + } + if (item_lib_count) + { + std::string url = region->getCapability("FetchLib2"); + llassert(!url.empty()); + ++mFetchCount; + LLSD body; + body["agent_id"] = gAgent.getID(); + body["items"] = item_request_body_lib; + LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item_lib); } mFetchTimer.reset(); } From f8aac1f3ddcb3395fc25cd1574aa3aace7fa07da Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Sun, 12 May 2013 17:56:29 +0200 Subject: [PATCH 4/5] Lets CurlEasyHandle::approved() stay consisten throughout its lifetime. --- indra/llmessage/aicurlprivate.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index db174445a..90b04ad8f 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -140,7 +140,7 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven // and revoking of the callbacks is harmless (and happens for the raw non-statemachine version). void remove_queued(void) { mQueuedForRemoval = true; } // In case it's added after being removed. - void add_queued(void) { mQueuedForRemoval = false; if (mApproved) { mApproved->honored(); mApproved = NULL; } } + void add_queued(void) { mQueuedForRemoval = false; if (mApproved) { mApproved->honored(); } } #ifdef DEBUG_CURLIO void debug(bool debug) { if (mDebug) debug_curl_remove_easy(mEasyHandle); if (debug) debug_curl_add_easy(mEasyHandle); mDebug = debug; } From 67e88561dc91c42afd47874ccdcf7573da4eebdb Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Sun, 12 May 2013 18:10:49 +0200 Subject: [PATCH 5/5] Queue/throttle fix. If AIPerService::add_queued_to fails because a new request is throttled, then do not add the request to the end of the queue, nor remove it from the queue: do nothing: it makes no sense to move the request to the back because they all belong to the same service and all of them will be either throttled or not. Note: Still need to fix that in this case we should look in queues of other services. --- indra/llmessage/aicurlperservice.cpp | 6 +++++- indra/llmessage/aicurlthread.cpp | 12 +++++++++--- indra/llmessage/aicurlthread.h | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index 8cf62bc2d..e97a18969 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -287,7 +287,11 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) { if (!mQueuedRequests.empty()) { - multi_handle->add_easy_request(mQueuedRequests.front()); + if (!multi_handle->add_easy_request(mQueuedRequests.front(), true)) + { + // Throttled. + return; + } mQueuedRequests.pop_front(); if (mQueuedRequests.empty()) { diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index e0e89d429..70c041a1f 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1331,7 +1331,7 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) case cmd_boost: // FIXME: future stuff break; case cmd_add: - multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request())); + multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), false); PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); break; case cmd_remove: @@ -1701,7 +1701,7 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread(). -void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) +bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue) { bool throttled = true; // Default. AIPerServicePtr per_service; @@ -1731,7 +1731,12 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) llassert(sTotalAdded == mAddedEasyRequests.size()); Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() << "; now processing " << mAddedEasyRequests.size() << " easy handles [running_handles = " << AICurlInterface::Stats::running_handles << "]."); - return; + return true; + } + if (from_queue) + { + // Throttled. Do not add to queue, because it is already in the queue. + return false; } // The request could not be added, we have to queue it. PerService_wat(*per_service)->queue(easy_request); @@ -1739,6 +1744,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; #endif + return true; } CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command) diff --git a/indra/llmessage/aicurlthread.h b/indra/llmessage/aicurlthread.h index 9f71b015c..9a4c2b487 100644 --- a/indra/llmessage/aicurlthread.h +++ b/indra/llmessage/aicurlthread.h @@ -59,7 +59,7 @@ class MultiHandle : public CurlMultiHandle ~MultiHandle(); // Add/remove an easy handle to/from a multi session. - void add_easy_request(AICurlEasyRequest const& easy_request); + bool add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue); CURLMcode remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command = false); // Reads/writes available data from a particular socket (non-blocking).