From cf4c4a72c2319ea368ec7b11d99247c836ca414c Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Tue, 21 May 2013 23:34:23 +0200 Subject: [PATCH] Added AICapabilityType and related. This splits the AIPerService queue up into four queues: one for each "capability type". On Second Life that doesn't make a difference in itself for textures because the texture service only serves one capability type: textures. Other services however can serve two types, while on Avination - that currently only has one services for everything - this really makes a difference because that single service now has four queues. More importantly however is that the administration of how many requests are in the "pipeline" (from approving that a new HTTP request may be added for given service, till curl finished it) is now per capability type (or service/capabitity type pair actually). This means downloads of a certain capability type (textures, inventory, mesh, other) will no longer stall because unapproved requests cluttered the queue for a given service. Moreover, before when a request did finished, it would only look for a new request in the queue of the service that just finished. This simple algorithm worked when there were no 'PerSerice' objects, and only one 'Curl' queue: because if anything was queued that that was because there were running requests, and when one of those running requests finished it made sense to see if one of those queued requests could be added now. However, after adding multiple queues, one for each service, it could happen that service A had queued requests while only requests from service B were actually running: only requests of B would ever finish and the requests of A would be queued forever. With this patch the algorithm is to look alternating first in the texture request queue and then in the inventory request queue - or vice versa, and if there are none of those, look for a request of a different type. If also that cannot be found, look for a request in another service. This is still not optimal and subject to change. --- indra/llmessage/aicurl.cpp | 9 +- indra/llmessage/aicurlperservice.cpp | 220 ++++++++++++------ indra/llmessage/aicurlperservice.h | 70 ++++-- indra/llmessage/aicurlprivate.h | 8 +- indra/llmessage/aicurlthread.cpp | 72 +++--- indra/llmessage/llhttpclient.h | 3 + indra/newview/llinventorymodel.h | 1 + .../llinventorymodelbackgroundfetch.cpp | 9 +- indra/newview/llmeshrepository.cpp | 5 + indra/newview/lltexturefetch.cpp | 9 +- 10 files changed, 273 insertions(+), 133 deletions(-) diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 248568d7e..e5dcb6ae8 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1248,10 +1248,10 @@ AIPerServicePtr CurlEasyRequest::getPerServicePtr(void) return mPerServicePtr; } -bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const +bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) const { // Note that easy_request (must) represent(s) this object; it's just passed for convenience. - return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request); + return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request, capability_type); } std::string CurlEasyRequest::getLowercaseHostname(void) const @@ -1270,7 +1270,7 @@ bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); BufferedCurlEasyRequest::BufferedCurlEasyRequest() : - mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL) + mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mCapabilityType(number_of_capability_types) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } @@ -1402,6 +1402,9 @@ void BufferedCurlEasyRequest::prepRequest(AICurlEasyRequest_wat& curl_easy_reque // Keep responder alive. mResponder = responder; + // Cache capability type, because it will be needed even after the responder was removed. + mCapabilityType = responder->capability_type(); + // Send header events to responder if needed. if (mResponder->needsHeaders()) { diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index e97a18969..ff452c358 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -71,14 +71,24 @@ void intrusive_ptr_release(RefCountedThreadSafePerService* per_service) using namespace AICurlPrivate; AIPerService::AIPerService(void) : - mApprovedRequests(0), mQueuedCommands(0), mAdded(0), mQueueEmpty(false), - mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. + mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. mConcurrectConnections(CurlConcurrentConnectionsPerService), + mTotalAdded(0), + mApprovedFirst(0), + mUnapprovedFirst(0) +{ +} + +AIPerService::CapabilityType::CapabilityType(void) : + mApprovedRequests(0), + mQueuedCommands(0), + mAdded(0), + mFlags(0), mMaxPipelinedRequests(CurlConcurrentConnectionsPerService) { } -AIPerService::~AIPerService() +AIPerService::CapabilityType::~CapabilityType() { } @@ -212,12 +222,20 @@ void AIPerService::release(AIPerServicePtr& instance) // Therefore, recheck the condition now that we have locked sInstanceMap. if (!instance->exactly_two_left()) { - // Some other thread added this host in the meantime. + // Some other thread added this service in the meantime. return; } - // The reference in the map is the last one; that means there can't be any curl easy requests queued for this host. - llassert(PerService_rat(*instance)->mQueuedRequests.empty()); - // Find the host and erase it from the map. +#ifdef SHOW_ASSERT + { + // The reference in the map is the last one; that means there can't be any curl easy requests queued for this service. + PerService_rat per_service_r(*instance); + for (int i = 0; i < number_of_capability_types; ++i) + { + llassert(per_service_r->mCapabilityType[i].mQueuedRequests.empty()); + } + } +#endif + // Find the service and erase it from the map. iterator const end = instance_map_w->end(); for(iterator iter = instance_map_w->begin(); iter != end; ++iter) { @@ -228,7 +246,7 @@ void AIPerService::release(AIPerServicePtr& instance) return; } } - // We should always find the host. + // We should always find the service. llassert(false); } instance.reset(); @@ -236,30 +254,32 @@ void AIPerService::release(AIPerServicePtr& instance) bool AIPerService::throttled() const { - return mAdded >= mConcurrectConnections; + return mTotalAdded >= mConcurrectConnections; } -void AIPerService::added_to_multi_handle(void) +void AIPerService::added_to_multi_handle(AICapabilityType capability_type) { - ++mAdded; + ++mCapabilityType[capability_type].mAdded; + ++mTotalAdded; } -void AIPerService::removed_from_multi_handle(void) +void AIPerService::removed_from_multi_handle(AICapabilityType capability_type) { - --mAdded; - llassert(mAdded >= 0); + --mCapabilityType[capability_type].mAdded; + --mTotalAdded; + llassert(mTotalAdded >= 0 && mCapabilityType[capability_type].mAdded >= 0); } -void AIPerService::queue(AICurlEasyRequest const& easy_request) +void AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) { - mQueuedRequests.push_back(easy_request.get_ptr()); + mCapabilityType[capability_type].mQueuedRequests.push_back(easy_request.get_ptr()); TotalQueued_wat(sTotalQueued)->count++; } -bool AIPerService::cancel(AICurlEasyRequest const& easy_request) +bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) { - queued_request_type::iterator const end = mQueuedRequests.end(); - queued_request_type::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request.get_ptr()); + CapabilityType::queued_request_type::iterator const end = mCapabilityType[capability_type].mQueuedRequests.end(); + CapabilityType::queued_request_type::iterator cur = std::find(mCapabilityType[capability_type].mQueuedRequests.begin(), end, easy_request.get_ptr()); if (cur == end) return false; // Not found. @@ -270,62 +290,122 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request) // want to break the order in which requests where added). Swap is also not // thread-safe, but OK here because it only touches the objects in the deque, // and the deque is protected by the lock on the AIPerService object. - queued_request_type::iterator prev = cur; + CapabilityType::queued_request_type::iterator prev = cur; while (++cur != end) { prev->swap(*cur); // This is safe, prev = cur; } - mQueuedRequests.pop_back(); // if this is safe. + mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe. TotalQueued_wat total_queued_w(sTotalQueued); total_queued_w->count--; llassert(total_queued_w->count >= 0); return true; } -void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) +void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool recursive) { - if (!mQueuedRequests.empty()) + int order[number_of_capability_types]; + // The first two types are approved types, they should be the first to try. + // Try the one that has the largest queue first, if they the queues have equal size, try mApprovedFirst first. + size_t s0 = mCapabilityType[0].mQueuedRequests.size(); + size_t s1 = mCapabilityType[1].mQueuedRequests.size(); + if (s0 == s1) { - if (!multi_handle->add_easy_request(mQueuedRequests.front(), true)) - { - // Throttled. - return; - } - mQueuedRequests.pop_front(); - if (mQueuedRequests.empty()) - { - // We obtained a request from the queue, and after that there we no more request in the queue of this host. - mQueueEmpty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in the queue of this host. - mQueueFull = true; - } - TotalQueued_wat total_queued_w(sTotalQueued); - llassert(total_queued_w->count > 0); - if (!--(total_queued_w->count)) - { - // We obtained a request from the queue, and after that there we no more request in any queue. - total_queued_w->empty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. - total_queued_w->full = true; - } + order[0] = mApprovedFirst; + mApprovedFirst = 1 - mApprovedFirst; + order[1] = mApprovedFirst; + } + else if (s0 > s1) + { + order[0] = 0; + order[1] = 1; } else { - // We can add a new request, but there is none in the queue! - mRequestStarvation = true; - TotalQueued_wat total_queued_w(sTotalQueued); - if (total_queued_w->count == 0) + order[0] = 1; + order[1] = 0; + } + // The next two types are unapproved types. Here, try them alternating regardless of queue size. + int n = mUnapprovedFirst; + for (int i = 2; i < number_of_capability_types; ++i, n = (n + 1) % (number_of_capability_types - 2)) + { + order[i] = 2 + n; + } + mUnapprovedFirst = (mUnapprovedFirst + 1) % (number_of_capability_types - 2); + + for (int i = 0; i < number_of_capability_types; ++i) + { + CapabilityType& ct(mCapabilityType[order[i]]); + if (!ct.mQueuedRequests.empty()) { - // The queue of every host is empty! - total_queued_w->starvation = true; + if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true)) + { + // Throttled. If this failed then every capability type will fail: we either are using too much bandwidth, or too many total connections. + // However, it MAY be that this service was thottled for using too much bandwidth by itself. Look if other services can be added. + break; + } + // Request was added, remove it from the queue. + ct.mQueuedRequests.pop_front(); + if (ct.mQueuedRequests.empty()) + { + // We obtained a request from the queue, and after that there we no more request in the queue of this service. + ct.mFlags |= ctf_empty; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in the queue of this service. + ct.mFlags |= ctf_full; + } + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->count > 0); + if (!--(total_queued_w->count)) + { + // We obtained a request from the queue, and after that there we no more request in any queue. + total_queued_w->empty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in some queue. + total_queued_w->full = true; + } + // We added something from a queue, so we're done. + return; } + else + { + // We could add a new request, but there is none in the queue! + // Note that if this service does not serve this capability type, + // then obviously this queue was empty; however, in that case + // this variable will never be looked at, so it's ok to set it. + ct.mFlags |= ctf_starvation; + } + if (i == number_of_capability_types - 1) + { + // Last entry also empty. All queues of this service were empty. Check total connections. + TotalQueued_wat total_queued_w(sTotalQueued); + if (total_queued_w->count == 0) + { + // The queue of every service is empty! + total_queued_w->starvation = true; + return; + } + } + } + if (recursive) + { + return; + } + // Nothing from this service could be added, try other services. + instance_map_wat instance_map_w(sInstanceMap); + for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) + { + PerService_wat per_service_w(*service->second); + if (&*per_service_w == this) + { + continue; + } + per_service_w->add_queued_to(multi_handle, true); } } @@ -333,15 +413,18 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) void AIPerService::purge(void) { instance_map_wat instance_map_w(sInstanceMap); - for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host) + for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) { - Dout(dc::curl, "Purging queue of host \"" << host->first << "\"."); - PerService_wat per_service_w(*host->second); - size_t s = per_service_w->mQueuedRequests.size(); - per_service_w->mQueuedRequests.clear(); + Dout(dc::curl, "Purging queues of service \"" << service->first << "\"."); + PerService_wat per_service_w(*service->second); TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_w->count -= s; - llassert(total_queued_w->count >= 0); + for (int i = 0; i < number_of_capability_types; ++i) + { + size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size(); + per_service_w->mCapabilityType[i].mQueuedRequests.clear(); + total_queued_w->count -= s; + llassert(total_queued_w->count >= 0); + } } } @@ -355,7 +438,10 @@ void AIPerService::adjust_concurrent_connections(int increment) U32 old_concurrent_connections = per_service_w->mConcurrectConnections; per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService); increment = per_service_w->mConcurrectConnections - old_concurrent_connections; - per_service_w->mMaxPipelinedRequests = llmax(per_service_w->mMaxPipelinedRequests + increment, 0); + for (int i = 0; i < number_of_capability_types; ++i) + { + per_service_w->mCapabilityType[i].mMaxPipelinedRequests = llmax(per_service_w->mCapabilityType[i].mMaxPipelinedRequests + increment, (U32)0); + } } } @@ -365,8 +451,8 @@ void AIPerService::Approvement::honored(void) { mHonored = true; AICurlPrivate::PerService_wat per_service_w(*mPerServicePtr); - llassert(per_service_w->mApprovedRequests > 0); - per_service_w->mApprovedRequests--; + llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0); + per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--; } } diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 5bcc28063..f50c1f393 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -72,6 +72,18 @@ typedef AIAccess PerService_wat; // Therefore, use an intrusive pointer for the threadsafe type. typedef boost::intrusive_ptr AIPerServicePtr; +//----------------------------------------------------------------------------- +// + +enum AICapabilityType { // {Capabilities} [Responders] + cap_texture = 0, // GetTexture [HTTPGetResponder] + cap_inventory = 1, // { FetchInventory2, FetchLib2 } [LLInventoryModel::fetchInventoryResponder], { FetchInventoryDescendents2, FetchLibDescendents2 } [LLInventoryModelFetchDescendentsResponder] + cap_mesh = 2, // GetMesh [LLMeshSkinInfoResponder, LLMeshDecompositionResponder, LLMeshPhysicsShapeResponder, LLMeshHeaderResponder, LLMeshLODResponder] + cap_other = 3, // All other capabilities + + number_of_capability_types = 4 +}; + //----------------------------------------------------------------------------- // AIPerService @@ -92,9 +104,6 @@ class AIPerService { friend class AIThreadSafeSimpleDC; // threadsafe_PerService AIPerService(void); - public: - ~AIPerService(); - public: typedef instance_map_type::iterator iterator; typedef instance_map_type::const_iterator const_iterator; @@ -112,20 +121,36 @@ class AIPerService { static void purge(void); private: - typedef std::deque queued_request_type; + static U16 const ctf_empty = 1; + static U16 const ctf_full = 2; + static U16 const ctf_starvation = 4; - int mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. - int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue. - int mAdded; // Number of active easy handles with this host. - queued_request_type mQueuedRequests; // Waiting (throttled) requests. + struct CapabilityType { + typedef std::deque queued_request_type; - bool mQueueEmpty; // Set to true when the queue becomes precisely empty. - bool mQueueFull; // Set to true when the queue is popped and then still isn't empty; - bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty. + queued_request_type mQueuedRequests; // Waiting (throttled) requests. + U16 mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. + U16 mQueuedCommands; // Number of add commands (minus remove commands), for this service, in the command queue. + U16 mAdded; // Number of active easy handles with this service. + U16 mFlags; // ctf_empty: Set to true when the queue becomes precisely empty. + // ctf_full : Set to true when the queue is popped and then still isn't empty; + // ctf_starvation: Set to true when the queue was about to be popped but was already empty. + U32 mMaxPipelinedRequests; // The maximum number of accepted requests for this service and (approved) capability type, that didn't finish yet. + + // Declare, not define, constructor and destructor - in order to avoid instantiation of queued_request_type from header. + CapabilityType(void); + ~CapabilityType(); + + S32 pipelined_requests(void) const { return mApprovedRequests + mQueuedCommands + mQueuedRequests.size() + mAdded; } + }; + + CapabilityType mCapabilityType[number_of_capability_types]; AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. - int mMaxPipelinedRequests; // The maximum number of accepted requests for this service that didn't finish yet. + int mTotalAdded; // Number of active easy handles with this host. + int mApprovedFirst; // First capability type to try. + int mUnapprovedFirst; // First capability type to try after all approved types were tried. // Global administration of the total number of queued requests of all services combined. private: @@ -176,20 +201,20 @@ class AIPerService { static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling. public: - void added_to_command_queue(void) { ++mQueuedCommands; } - void removed_from_command_queue(void) { --mQueuedCommands; llassert(mQueuedCommands >= 0); } - void added_to_multi_handle(void); // Called when an easy handle for this host has been added to the multi handle. - void removed_from_multi_handle(void); // Called when an easy handle for this host is removed again from the multi handle. + void added_to_command_queue(AICapabilityType capability_type) { ++mCapabilityType[capability_type].mQueuedCommands; } + void removed_from_command_queue(AICapabilityType capability_type) { --mCapabilityType[capability_type].mQueuedCommands; llassert(mCapabilityType[capability_type].mQueuedCommands >= 0); } + void added_to_multi_handle(AICapabilityType capability_type); // Called when an easy handle for this host has been added to the multi handle. + void removed_from_multi_handle(AICapabilityType capability_type); // Called when an easy handle for this host is removed again from the multi handle. bool throttled(void) const; // Returns true if the maximum number of allowed requests for this host have been added to the multi handle. - void queue(AICurlEasyRequest const& easy_request); // Add easy_request to the queue. - bool cancel(AICurlEasyRequest const& easy_request); // Remove easy_request from the queue (if it's there). + void queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Add easy_request to the queue. + bool cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Remove easy_request from the queue (if it's there). - void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh); + void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool recursive = false); // Add queued easy handle (if any) to the multi handle. The request is removed from the queue, // followed by either a call to added_to_multi_handle() or to queue() to add it back. - S32 pipelined_requests(void) const { return mApprovedRequests + mQueuedCommands + mQueuedRequests.size() + mAdded; } + S32 pipelined_requests(AICapabilityType capability_type) const { return mCapabilityType[capability_type].pipelined_requests(); } AIAverage& bandwidth(void) { return mHTTPBandwidth; } AIAverage const& bandwidth(void) const { return mHTTPBandwidth; } @@ -205,9 +230,10 @@ class AIPerService { class Approvement : public LLThreadSafeRefCount { private: AIPerServicePtr mPerServicePtr; + AICapabilityType mCapabilityType; bool mHonored; public: - Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { } + Approvement(AIPerServicePtr const& per_service, AICapabilityType capability_type) : mPerServicePtr(per_service), mCapabilityType(capability_type), mHonored(false) { } ~Approvement() { if (!mHonored) not_honored(); } void honored(void); void not_honored(void); @@ -221,7 +247,7 @@ class AIPerService { // Returns approvement if curl can handle another request for this host. // Should return NULL if the maximum allowed HTTP bandwidth is reached, or when // the latency between request and actual delivery becomes too large. - static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service); + static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type); // Return true if too much bandwidth is being used. static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index 90b04ad8f..aa5a1ba7b 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -358,8 +358,8 @@ class CurlEasyRequest : public CurlEasyHandle { // PerService API. AIPerServicePtr getPerServicePtr(void); // (Optionally create and) return a pointer to the unique // AIPerService corresponding to mLowercaseServicename. - bool removeFromPerServiceQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all. - // Returns true if it was queued. + bool removeFromPerServiceQueue(AICurlEasyRequest const&, AICapabilityType capability_type) const; // Remove this request from the per-host queue, if queued at all. + // Returns true if it was queued. protected: // Pass events to parent. /*virtual*/ void added_to_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w); @@ -417,6 +417,7 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U8* mLastRead; // Pointer into mInput where we last stopped reading (or NULL to start at the beginning). buffer_ptr_t mOutput; LLHTTPClient::ResponderPtr mResponder; + AICapabilityType mCapabilityType; //U32 mBodyLimit; // From the old LLURLRequestDetail::mBodyLimit, but never used. U32 mStatus; // HTTP status, decoded from the first header line. std::string mReason; // The "reason" from the same header line. @@ -459,6 +460,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } + + // Return the capability type of this request. + AICapabilityType capability_type(void) const { llassert(mCapabilityType != number_of_capability_types); return mCapabilityType; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 70c041a1f..2611ef7bf 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1325,19 +1325,30 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) // Access command_being_processed only. { command_being_processed_rat command_being_processed_r(command_being_processed); + AICapabilityType capability_type; + AIPerServicePtr per_service; + { + AICurlEasyRequest_wat easy_request_w(*command_being_processed_r->easy_request()); + capability_type = easy_request_w->capability_type(); + per_service = easy_request_w->getPerServicePtr(); + } switch(command_being_processed_r->command()) { case cmd_none: case cmd_boost: // FIXME: future stuff break; case cmd_add: + { multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), false); - PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); + PerService_wat(*per_service)->removed_from_command_queue(capability_type); break; + } case cmd_remove: - PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. + { + PerService_wat(*per_service)->added_to_command_queue(capability_type); // Not really, but this has the same effect as 'removed a remove command'. multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true); break; + } } // Done processing. command_being_processed_wat command_being_processed_w(command_being_processed_r); @@ -1704,18 +1715,21 @@ static U32 curl_max_total_concurrent_connections = 32; // Initialized on st bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue) { bool throttled = true; // Default. + AICapabilityType capability_type; AIPerServicePtr per_service; { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); + capability_type = curl_easy_request_w->capability_type(); per_service = curl_easy_request_w->getPerServicePtr(); - bool too_much_bandwidth = !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + // Never throttle on bandwidth if there are no handles running (sTotalAdded == 1, the long poll connection). + bool too_much_bandwidth = sTotalAdded > 1 && !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); PerService_wat per_service_w(*per_service); - if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) + if (!too_much_bandwidth && sTotalAdded < curl_max_total_concurrent_connections && !per_service_w->throttled()) { curl_easy_request_w->set_timeout_opts(); if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK) { - per_service_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests. + per_service_w->added_to_multi_handle(capability_type); // (About to be) added to mAddedEasyRequests. throttled = false; // Fall through... } } @@ -1739,7 +1753,7 @@ bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool f return false; } // The request could not be added, we have to queue it. - PerService_wat(*per_service)->queue(easy_request); + PerService_wat(*per_service)->queue(easy_request, capability_type); #ifdef SHOW_ASSERT // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; @@ -1757,7 +1771,7 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request #ifdef SHOW_ASSERT bool removed = #endif - easy_request_w->removeFromPerServiceQueue(easy_request); + easy_request_w->removeFromPerServiceQueue(easy_request, easy_request_w->capability_type()); #ifdef SHOW_ASSERT if (removed) { @@ -1773,12 +1787,14 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command) { CURLMcode res; + AICapabilityType capability_type; AIPerServicePtr per_service; { AICurlEasyRequest_wat curl_easy_request_w(**iter); res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle); + capability_type = curl_easy_request_w->capability_type(); per_service = curl_easy_request_w->getPerServicePtr(); - PerService_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. + PerService_wat(*per_service)->removed_from_multi_handle(capability_type); // (About to be) removed from mAddedEasyRequests. #ifdef SHOW_ASSERT curl_easy_request_w->mRemovedPerCommand = as_per_command; #endif @@ -2429,7 +2445,7 @@ void AICurlEasyRequest::addRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_add)); command_queue_w->size++; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); + PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(curl_easy_request_w->capability_type()); curl_easy_request_w->add_queued(); } // Something was added to the queue, wake up the thread to get it. @@ -2493,7 +2509,7 @@ void AICurlEasyRequest::removeRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_remove)); command_queue_w->size--; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. + PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(curl_easy_request_w->capability_type()); // Note really, but this has the same effect as 'added a remove command'. // Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request. curl_easy_request_w->remove_queued(); } @@ -2614,7 +2630,7 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // running requests (in MultiHandle::mAddedEasyRequests)). // //static -AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service) +AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type) { using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; @@ -2650,31 +2666,30 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } } - // Check if it's ok to get a new request for this particular service and update the per-service threshold. + // Check if it's ok to get a new request for this particular capability type and update the per-capability-type threshold. bool reject, equal, increment_threshold; { PerService_wat per_service_w(*per_service); - S32 const pipelined_requests_per_service = per_service_w->pipelined_requests(); - //llassert(pipelined_requests_per_service >= 0 && pipelined_requests_per_service <= 16); - reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests; - equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests; - increment_threshold = per_service_w->mRequestStarvation; - decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty; - // Reset flags. - per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false; + CapabilityType& ct(per_service_w->mCapabilityType[capability_type]); + S32 const pipelined_requests_per_capability_type = ct.pipelined_requests(); + reject = pipelined_requests_per_capability_type >= ct.mMaxPipelinedRequests; + equal = pipelined_requests_per_capability_type == ct.mMaxPipelinedRequests; + increment_threshold = ct.mFlags & ctf_starvation; + decrement_threshold = (ct.mFlags & (ctf_empty | ctf_full)) == ctf_full; + ct.mFlags = 0; if (decrement_threshold) { - if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections) + if (ct.mMaxPipelinedRequests > per_service_w->mConcurrectConnections) { - per_service_w->mMaxPipelinedRequests--; + ct.mMaxPipelinedRequests--; } } else if (increment_threshold && reject) { - if (per_service_w->mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections) + if (ct.mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections) { - per_service_w->mMaxPipelinedRequests++; + ct.mMaxPipelinedRequests++; // Immediately take the new threshold into account. reject = !equal; } @@ -2683,8 +2698,7 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c { // Before releasing the lock on per_service, stop other threads from getting a // too small value from pipelined_requests() and approving too many requests. - per_service_w->mApprovedRequests++; - //llassert(per_service_w->mApprovedRequests <= 16); + ct.mApprovedRequests++; } } if (reject) @@ -2697,7 +2711,7 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (checkBandwidthUsage(per_service, sTime_40ms)) { // Too much bandwidth is being used, either in total or for this service. - PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. return NULL; } @@ -2731,10 +2745,10 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } if (reject) { - PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. return NULL; } - return new Approvement(per_service); + return new Approvement(per_service, capability_type); } bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index e376530e5..af11a6687 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -224,6 +224,9 @@ public: // If this function returns false then we generate an error when a redirect status (300..399) is received. virtual bool redirect_status_ok(void) const { return followRedir(); } + // Returns the capability type used by this responder. + virtual AICapabilityType capability_type(void) const { return cap_other; } + // Timeout policy to use. virtual AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const = 0; diff --git a/indra/newview/llinventorymodel.h b/indra/newview/llinventorymodel.h index 3f87c2615..a73a1fee6 100644 --- a/indra/newview/llinventorymodel.h +++ b/indra/newview/llinventorymodel.h @@ -89,6 +89,7 @@ public: fetchInventoryResponder(const LLSD& request_sd) : mRequestSD(request_sd) {}; /*virtual*/ void result(const LLSD& content); /*virtual*/ void error(U32 status, const std::string& reason); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_inventory; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return fetchInventoryResponder_timeout; } /*virtual*/ char const* getName(void) const { return "fetchInventoryResponder"; } protected: diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index dba9c9836..6a828db9b 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -420,6 +420,7 @@ class LLInventoryModelFetchDescendentsResponder : public LLHTTPClient::Responder {}; /*virtual*/ void result(const LLSD& content); /*virtual*/ void error(U32 status, const std::string& reason); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_inventory; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return inventoryModelFetchDescendentsResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLInventoryModelFetchDescendentsResponder"; } @@ -652,7 +653,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (folder_lib_count == max_batch_size || (folder_lib_count == 0 && - !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -663,7 +664,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (folder_count == max_batch_size || (folder_count == 0 && - !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -701,7 +702,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (item_count == max_batch_size || (item_count == 0 && - !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -712,7 +713,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (item_lib_count == max_batch_size || (item_lib_count == 0 && - !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } diff --git a/indra/newview/llmeshrepository.cpp b/indra/newview/llmeshrepository.cpp index 0caa10716..d380609fd 100644 --- a/indra/newview/llmeshrepository.cpp +++ b/indra/newview/llmeshrepository.cpp @@ -229,6 +229,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshHeaderResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshHeaderResponder"; } }; @@ -256,6 +257,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshLODResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshLODResponder"; } }; @@ -276,6 +278,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshSkinInfoResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshSkinInfoResponder"; } }; @@ -296,6 +299,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshDecompositionResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshDecompositionResponder"; } }; @@ -316,6 +320,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshPhysicsShapeResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshPhysicsShapeResponder"; } }; diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index 0d58442c8..cdc31ae43 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -413,11 +413,8 @@ public: } } - /*virtual*/ bool followRedir() const - { - return mFollowRedir; - } - + /*virtual*/ bool followRedir() const { return mFollowRedir; } + /*virtual*/ AICapabilityType capability_type(void) const { return cap_texture; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return HTTPGetResponder_timeout; } /*virtual*/ char const* getName(void) const { return "HTTPGetResponder"; } @@ -1275,7 +1272,7 @@ bool LLTextureFetchWorker::doWork(S32 param) // AIPerService::approveHTTPRequestFor returns approvement for ONE request. // This object keeps track of whether or not that is honored. - LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); + LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_texture); if (!approved) { return false ; //wait.