diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 248568d7e..e5dcb6ae8 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -1248,10 +1248,10 @@ AIPerServicePtr CurlEasyRequest::getPerServicePtr(void) return mPerServicePtr; } -bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const +bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) const { // Note that easy_request (must) represent(s) this object; it's just passed for convenience. - return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request); + return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request, capability_type); } std::string CurlEasyRequest::getLowercaseHostname(void) const @@ -1270,7 +1270,7 @@ bool BufferedCurlEasyRequest::sShuttingDown = false; AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25); BufferedCurlEasyRequest::BufferedCurlEasyRequest() : - mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL) + mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mCapabilityType(number_of_capability_types) { AICurlInterface::Stats::BufferedCurlEasyRequest_count++; } @@ -1402,6 +1402,9 @@ void BufferedCurlEasyRequest::prepRequest(AICurlEasyRequest_wat& curl_easy_reque // Keep responder alive. mResponder = responder; + // Cache capability type, because it will be needed even after the responder was removed. + mCapabilityType = responder->capability_type(); + // Send header events to responder if needed. if (mResponder->needsHeaders()) { diff --git a/indra/llmessage/aicurlperservice.cpp b/indra/llmessage/aicurlperservice.cpp index e97a18969..ff452c358 100644 --- a/indra/llmessage/aicurlperservice.cpp +++ b/indra/llmessage/aicurlperservice.cpp @@ -71,14 +71,24 @@ void intrusive_ptr_release(RefCountedThreadSafePerService* per_service) using namespace AICurlPrivate; AIPerService::AIPerService(void) : - mApprovedRequests(0), mQueuedCommands(0), mAdded(0), mQueueEmpty(false), - mQueueFull(false), mRequestStarvation(false), mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. + mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. mConcurrectConnections(CurlConcurrentConnectionsPerService), + mTotalAdded(0), + mApprovedFirst(0), + mUnapprovedFirst(0) +{ +} + +AIPerService::CapabilityType::CapabilityType(void) : + mApprovedRequests(0), + mQueuedCommands(0), + mAdded(0), + mFlags(0), mMaxPipelinedRequests(CurlConcurrentConnectionsPerService) { } -AIPerService::~AIPerService() +AIPerService::CapabilityType::~CapabilityType() { } @@ -212,12 +222,20 @@ void AIPerService::release(AIPerServicePtr& instance) // Therefore, recheck the condition now that we have locked sInstanceMap. if (!instance->exactly_two_left()) { - // Some other thread added this host in the meantime. + // Some other thread added this service in the meantime. return; } - // The reference in the map is the last one; that means there can't be any curl easy requests queued for this host. - llassert(PerService_rat(*instance)->mQueuedRequests.empty()); - // Find the host and erase it from the map. +#ifdef SHOW_ASSERT + { + // The reference in the map is the last one; that means there can't be any curl easy requests queued for this service. + PerService_rat per_service_r(*instance); + for (int i = 0; i < number_of_capability_types; ++i) + { + llassert(per_service_r->mCapabilityType[i].mQueuedRequests.empty()); + } + } +#endif + // Find the service and erase it from the map. iterator const end = instance_map_w->end(); for(iterator iter = instance_map_w->begin(); iter != end; ++iter) { @@ -228,7 +246,7 @@ void AIPerService::release(AIPerServicePtr& instance) return; } } - // We should always find the host. + // We should always find the service. llassert(false); } instance.reset(); @@ -236,30 +254,32 @@ void AIPerService::release(AIPerServicePtr& instance) bool AIPerService::throttled() const { - return mAdded >= mConcurrectConnections; + return mTotalAdded >= mConcurrectConnections; } -void AIPerService::added_to_multi_handle(void) +void AIPerService::added_to_multi_handle(AICapabilityType capability_type) { - ++mAdded; + ++mCapabilityType[capability_type].mAdded; + ++mTotalAdded; } -void AIPerService::removed_from_multi_handle(void) +void AIPerService::removed_from_multi_handle(AICapabilityType capability_type) { - --mAdded; - llassert(mAdded >= 0); + --mCapabilityType[capability_type].mAdded; + --mTotalAdded; + llassert(mTotalAdded >= 0 && mCapabilityType[capability_type].mAdded >= 0); } -void AIPerService::queue(AICurlEasyRequest const& easy_request) +void AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) { - mQueuedRequests.push_back(easy_request.get_ptr()); + mCapabilityType[capability_type].mQueuedRequests.push_back(easy_request.get_ptr()); TotalQueued_wat(sTotalQueued)->count++; } -bool AIPerService::cancel(AICurlEasyRequest const& easy_request) +bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) { - queued_request_type::iterator const end = mQueuedRequests.end(); - queued_request_type::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request.get_ptr()); + CapabilityType::queued_request_type::iterator const end = mCapabilityType[capability_type].mQueuedRequests.end(); + CapabilityType::queued_request_type::iterator cur = std::find(mCapabilityType[capability_type].mQueuedRequests.begin(), end, easy_request.get_ptr()); if (cur == end) return false; // Not found. @@ -270,62 +290,122 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request) // want to break the order in which requests where added). Swap is also not // thread-safe, but OK here because it only touches the objects in the deque, // and the deque is protected by the lock on the AIPerService object. - queued_request_type::iterator prev = cur; + CapabilityType::queued_request_type::iterator prev = cur; while (++cur != end) { prev->swap(*cur); // This is safe, prev = cur; } - mQueuedRequests.pop_back(); // if this is safe. + mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe. TotalQueued_wat total_queued_w(sTotalQueued); total_queued_w->count--; llassert(total_queued_w->count >= 0); return true; } -void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) +void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool recursive) { - if (!mQueuedRequests.empty()) + int order[number_of_capability_types]; + // The first two types are approved types, they should be the first to try. + // Try the one that has the largest queue first, if they the queues have equal size, try mApprovedFirst first. + size_t s0 = mCapabilityType[0].mQueuedRequests.size(); + size_t s1 = mCapabilityType[1].mQueuedRequests.size(); + if (s0 == s1) { - if (!multi_handle->add_easy_request(mQueuedRequests.front(), true)) - { - // Throttled. - return; - } - mQueuedRequests.pop_front(); - if (mQueuedRequests.empty()) - { - // We obtained a request from the queue, and after that there we no more request in the queue of this host. - mQueueEmpty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in the queue of this host. - mQueueFull = true; - } - TotalQueued_wat total_queued_w(sTotalQueued); - llassert(total_queued_w->count > 0); - if (!--(total_queued_w->count)) - { - // We obtained a request from the queue, and after that there we no more request in any queue. - total_queued_w->empty = true; - } - else - { - // We obtained a request from the queue, and even after that there was at least one more request in some queue. - total_queued_w->full = true; - } + order[0] = mApprovedFirst; + mApprovedFirst = 1 - mApprovedFirst; + order[1] = mApprovedFirst; + } + else if (s0 > s1) + { + order[0] = 0; + order[1] = 1; } else { - // We can add a new request, but there is none in the queue! - mRequestStarvation = true; - TotalQueued_wat total_queued_w(sTotalQueued); - if (total_queued_w->count == 0) + order[0] = 1; + order[1] = 0; + } + // The next two types are unapproved types. Here, try them alternating regardless of queue size. + int n = mUnapprovedFirst; + for (int i = 2; i < number_of_capability_types; ++i, n = (n + 1) % (number_of_capability_types - 2)) + { + order[i] = 2 + n; + } + mUnapprovedFirst = (mUnapprovedFirst + 1) % (number_of_capability_types - 2); + + for (int i = 0; i < number_of_capability_types; ++i) + { + CapabilityType& ct(mCapabilityType[order[i]]); + if (!ct.mQueuedRequests.empty()) { - // The queue of every host is empty! - total_queued_w->starvation = true; + if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true)) + { + // Throttled. If this failed then every capability type will fail: we either are using too much bandwidth, or too many total connections. + // However, it MAY be that this service was thottled for using too much bandwidth by itself. Look if other services can be added. + break; + } + // Request was added, remove it from the queue. + ct.mQueuedRequests.pop_front(); + if (ct.mQueuedRequests.empty()) + { + // We obtained a request from the queue, and after that there we no more request in the queue of this service. + ct.mFlags |= ctf_empty; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in the queue of this service. + ct.mFlags |= ctf_full; + } + TotalQueued_wat total_queued_w(sTotalQueued); + llassert(total_queued_w->count > 0); + if (!--(total_queued_w->count)) + { + // We obtained a request from the queue, and after that there we no more request in any queue. + total_queued_w->empty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in some queue. + total_queued_w->full = true; + } + // We added something from a queue, so we're done. + return; } + else + { + // We could add a new request, but there is none in the queue! + // Note that if this service does not serve this capability type, + // then obviously this queue was empty; however, in that case + // this variable will never be looked at, so it's ok to set it. + ct.mFlags |= ctf_starvation; + } + if (i == number_of_capability_types - 1) + { + // Last entry also empty. All queues of this service were empty. Check total connections. + TotalQueued_wat total_queued_w(sTotalQueued); + if (total_queued_w->count == 0) + { + // The queue of every service is empty! + total_queued_w->starvation = true; + return; + } + } + } + if (recursive) + { + return; + } + // Nothing from this service could be added, try other services. + instance_map_wat instance_map_w(sInstanceMap); + for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) + { + PerService_wat per_service_w(*service->second); + if (&*per_service_w == this) + { + continue; + } + per_service_w->add_queued_to(multi_handle, true); } } @@ -333,15 +413,18 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle) void AIPerService::purge(void) { instance_map_wat instance_map_w(sInstanceMap); - for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host) + for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) { - Dout(dc::curl, "Purging queue of host \"" << host->first << "\"."); - PerService_wat per_service_w(*host->second); - size_t s = per_service_w->mQueuedRequests.size(); - per_service_w->mQueuedRequests.clear(); + Dout(dc::curl, "Purging queues of service \"" << service->first << "\"."); + PerService_wat per_service_w(*service->second); TotalQueued_wat total_queued_w(sTotalQueued); - total_queued_w->count -= s; - llassert(total_queued_w->count >= 0); + for (int i = 0; i < number_of_capability_types; ++i) + { + size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size(); + per_service_w->mCapabilityType[i].mQueuedRequests.clear(); + total_queued_w->count -= s; + llassert(total_queued_w->count >= 0); + } } } @@ -355,7 +438,10 @@ void AIPerService::adjust_concurrent_connections(int increment) U32 old_concurrent_connections = per_service_w->mConcurrectConnections; per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService); increment = per_service_w->mConcurrectConnections - old_concurrent_connections; - per_service_w->mMaxPipelinedRequests = llmax(per_service_w->mMaxPipelinedRequests + increment, 0); + for (int i = 0; i < number_of_capability_types; ++i) + { + per_service_w->mCapabilityType[i].mMaxPipelinedRequests = llmax(per_service_w->mCapabilityType[i].mMaxPipelinedRequests + increment, (U32)0); + } } } @@ -365,8 +451,8 @@ void AIPerService::Approvement::honored(void) { mHonored = true; AICurlPrivate::PerService_wat per_service_w(*mPerServicePtr); - llassert(per_service_w->mApprovedRequests > 0); - per_service_w->mApprovedRequests--; + llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0); + per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--; } } diff --git a/indra/llmessage/aicurlperservice.h b/indra/llmessage/aicurlperservice.h index 5bcc28063..f50c1f393 100644 --- a/indra/llmessage/aicurlperservice.h +++ b/indra/llmessage/aicurlperservice.h @@ -72,6 +72,18 @@ typedef AIAccess PerService_wat; // Therefore, use an intrusive pointer for the threadsafe type. typedef boost::intrusive_ptr AIPerServicePtr; +//----------------------------------------------------------------------------- +// + +enum AICapabilityType { // {Capabilities} [Responders] + cap_texture = 0, // GetTexture [HTTPGetResponder] + cap_inventory = 1, // { FetchInventory2, FetchLib2 } [LLInventoryModel::fetchInventoryResponder], { FetchInventoryDescendents2, FetchLibDescendents2 } [LLInventoryModelFetchDescendentsResponder] + cap_mesh = 2, // GetMesh [LLMeshSkinInfoResponder, LLMeshDecompositionResponder, LLMeshPhysicsShapeResponder, LLMeshHeaderResponder, LLMeshLODResponder] + cap_other = 3, // All other capabilities + + number_of_capability_types = 4 +}; + //----------------------------------------------------------------------------- // AIPerService @@ -92,9 +104,6 @@ class AIPerService { friend class AIThreadSafeSimpleDC; // threadsafe_PerService AIPerService(void); - public: - ~AIPerService(); - public: typedef instance_map_type::iterator iterator; typedef instance_map_type::const_iterator const_iterator; @@ -112,20 +121,36 @@ class AIPerService { static void purge(void); private: - typedef std::deque queued_request_type; + static U16 const ctf_empty = 1; + static U16 const ctf_full = 2; + static U16 const ctf_starvation = 4; - int mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. - int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue. - int mAdded; // Number of active easy handles with this host. - queued_request_type mQueuedRequests; // Waiting (throttled) requests. + struct CapabilityType { + typedef std::deque queued_request_type; - bool mQueueEmpty; // Set to true when the queue becomes precisely empty. - bool mQueueFull; // Set to true when the queue is popped and then still isn't empty; - bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty. + queued_request_type mQueuedRequests; // Waiting (throttled) requests. + U16 mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet. + U16 mQueuedCommands; // Number of add commands (minus remove commands), for this service, in the command queue. + U16 mAdded; // Number of active easy handles with this service. + U16 mFlags; // ctf_empty: Set to true when the queue becomes precisely empty. + // ctf_full : Set to true when the queue is popped and then still isn't empty; + // ctf_starvation: Set to true when the queue was about to be popped but was already empty. + U32 mMaxPipelinedRequests; // The maximum number of accepted requests for this service and (approved) capability type, that didn't finish yet. + + // Declare, not define, constructor and destructor - in order to avoid instantiation of queued_request_type from header. + CapabilityType(void); + ~CapabilityType(); + + S32 pipelined_requests(void) const { return mApprovedRequests + mQueuedCommands + mQueuedRequests.size() + mAdded; } + }; + + CapabilityType mCapabilityType[number_of_capability_types]; AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second. int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service. - int mMaxPipelinedRequests; // The maximum number of accepted requests for this service that didn't finish yet. + int mTotalAdded; // Number of active easy handles with this host. + int mApprovedFirst; // First capability type to try. + int mUnapprovedFirst; // First capability type to try after all approved types were tried. // Global administration of the total number of queued requests of all services combined. private: @@ -176,20 +201,20 @@ class AIPerService { static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling. public: - void added_to_command_queue(void) { ++mQueuedCommands; } - void removed_from_command_queue(void) { --mQueuedCommands; llassert(mQueuedCommands >= 0); } - void added_to_multi_handle(void); // Called when an easy handle for this host has been added to the multi handle. - void removed_from_multi_handle(void); // Called when an easy handle for this host is removed again from the multi handle. + void added_to_command_queue(AICapabilityType capability_type) { ++mCapabilityType[capability_type].mQueuedCommands; } + void removed_from_command_queue(AICapabilityType capability_type) { --mCapabilityType[capability_type].mQueuedCommands; llassert(mCapabilityType[capability_type].mQueuedCommands >= 0); } + void added_to_multi_handle(AICapabilityType capability_type); // Called when an easy handle for this host has been added to the multi handle. + void removed_from_multi_handle(AICapabilityType capability_type); // Called when an easy handle for this host is removed again from the multi handle. bool throttled(void) const; // Returns true if the maximum number of allowed requests for this host have been added to the multi handle. - void queue(AICurlEasyRequest const& easy_request); // Add easy_request to the queue. - bool cancel(AICurlEasyRequest const& easy_request); // Remove easy_request from the queue (if it's there). + void queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Add easy_request to the queue. + bool cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Remove easy_request from the queue (if it's there). - void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh); + void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool recursive = false); // Add queued easy handle (if any) to the multi handle. The request is removed from the queue, // followed by either a call to added_to_multi_handle() or to queue() to add it back. - S32 pipelined_requests(void) const { return mApprovedRequests + mQueuedCommands + mQueuedRequests.size() + mAdded; } + S32 pipelined_requests(AICapabilityType capability_type) const { return mCapabilityType[capability_type].pipelined_requests(); } AIAverage& bandwidth(void) { return mHTTPBandwidth; } AIAverage const& bandwidth(void) const { return mHTTPBandwidth; } @@ -205,9 +230,10 @@ class AIPerService { class Approvement : public LLThreadSafeRefCount { private: AIPerServicePtr mPerServicePtr; + AICapabilityType mCapabilityType; bool mHonored; public: - Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { } + Approvement(AIPerServicePtr const& per_service, AICapabilityType capability_type) : mPerServicePtr(per_service), mCapabilityType(capability_type), mHonored(false) { } ~Approvement() { if (!mHonored) not_honored(); } void honored(void); void not_honored(void); @@ -221,7 +247,7 @@ class AIPerService { // Returns approvement if curl can handle another request for this host. // Should return NULL if the maximum allowed HTTP bandwidth is reached, or when // the latency between request and actual delivery becomes too large. - static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service); + static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type); // Return true if too much bandwidth is being used. static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms); diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index 90b04ad8f..aa5a1ba7b 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -358,8 +358,8 @@ class CurlEasyRequest : public CurlEasyHandle { // PerService API. AIPerServicePtr getPerServicePtr(void); // (Optionally create and) return a pointer to the unique // AIPerService corresponding to mLowercaseServicename. - bool removeFromPerServiceQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all. - // Returns true if it was queued. + bool removeFromPerServiceQueue(AICurlEasyRequest const&, AICapabilityType capability_type) const; // Remove this request from the per-host queue, if queued at all. + // Returns true if it was queued. protected: // Pass events to parent. /*virtual*/ void added_to_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w); @@ -417,6 +417,7 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { U8* mLastRead; // Pointer into mInput where we last stopped reading (or NULL to start at the beginning). buffer_ptr_t mOutput; LLHTTPClient::ResponderPtr mResponder; + AICapabilityType mCapabilityType; //U32 mBodyLimit; // From the old LLURLRequestDetail::mBodyLimit, but never used. U32 mStatus; // HTTP status, decoded from the first header line. std::string mReason; // The "reason" from the same header line. @@ -459,6 +460,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest { // Return true when prepRequest was already called and the object has not been // invalidated as a result of calling timed_out(). bool isValid(void) const { return mResponder; } + + // Return the capability type of this request. + AICapabilityType capability_type(void) const { llassert(mCapabilityType != number_of_capability_types); return mCapabilityType; } }; inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void) diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 70c041a1f..2611ef7bf 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -1325,19 +1325,30 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w) // Access command_being_processed only. { command_being_processed_rat command_being_processed_r(command_being_processed); + AICapabilityType capability_type; + AIPerServicePtr per_service; + { + AICurlEasyRequest_wat easy_request_w(*command_being_processed_r->easy_request()); + capability_type = easy_request_w->capability_type(); + per_service = easy_request_w->getPerServicePtr(); + } switch(command_being_processed_r->command()) { case cmd_none: case cmd_boost: // FIXME: future stuff break; case cmd_add: + { multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), false); - PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue(); + PerService_wat(*per_service)->removed_from_command_queue(capability_type); break; + } case cmd_remove: - PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'. + { + PerService_wat(*per_service)->added_to_command_queue(capability_type); // Not really, but this has the same effect as 'removed a remove command'. multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true); break; + } } // Done processing. command_being_processed_wat command_being_processed_w(command_being_processed_r); @@ -1704,18 +1715,21 @@ static U32 curl_max_total_concurrent_connections = 32; // Initialized on st bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue) { bool throttled = true; // Default. + AICapabilityType capability_type; AIPerServicePtr per_service; { AICurlEasyRequest_wat curl_easy_request_w(*easy_request); + capability_type = curl_easy_request_w->capability_type(); per_service = curl_easy_request_w->getPerServicePtr(); - bool too_much_bandwidth = !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); + // Never throttle on bandwidth if there are no handles running (sTotalAdded == 1, the long poll connection). + bool too_much_bandwidth = sTotalAdded > 1 && !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms); PerService_wat per_service_w(*per_service); - if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled()) + if (!too_much_bandwidth && sTotalAdded < curl_max_total_concurrent_connections && !per_service_w->throttled()) { curl_easy_request_w->set_timeout_opts(); if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK) { - per_service_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests. + per_service_w->added_to_multi_handle(capability_type); // (About to be) added to mAddedEasyRequests. throttled = false; // Fall through... } } @@ -1739,7 +1753,7 @@ bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool f return false; } // The request could not be added, we have to queue it. - PerService_wat(*per_service)->queue(easy_request); + PerService_wat(*per_service)->queue(easy_request, capability_type); #ifdef SHOW_ASSERT // Not active yet, but it's no longer an error if next we try to remove the request. AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false; @@ -1757,7 +1771,7 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request #ifdef SHOW_ASSERT bool removed = #endif - easy_request_w->removeFromPerServiceQueue(easy_request); + easy_request_w->removeFromPerServiceQueue(easy_request, easy_request_w->capability_type()); #ifdef SHOW_ASSERT if (removed) { @@ -1773,12 +1787,14 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command) { CURLMcode res; + AICapabilityType capability_type; AIPerServicePtr per_service; { AICurlEasyRequest_wat curl_easy_request_w(**iter); res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle); + capability_type = curl_easy_request_w->capability_type(); per_service = curl_easy_request_w->getPerServicePtr(); - PerService_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests. + PerService_wat(*per_service)->removed_from_multi_handle(capability_type); // (About to be) removed from mAddedEasyRequests. #ifdef SHOW_ASSERT curl_easy_request_w->mRemovedPerCommand = as_per_command; #endif @@ -2429,7 +2445,7 @@ void AICurlEasyRequest::addRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_add)); command_queue_w->size++; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(); + PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue(curl_easy_request_w->capability_type()); curl_easy_request_w->add_queued(); } // Something was added to the queue, wake up the thread to get it. @@ -2493,7 +2509,7 @@ void AICurlEasyRequest::removeRequest(void) command_queue_w->commands.push_back(Command(*this, cmd_remove)); command_queue_w->size--; AICurlEasyRequest_wat curl_easy_request_w(*get()); - PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'. + PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(curl_easy_request_w->capability_type()); // Note really, but this has the same effect as 'added a remove command'. // Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request. curl_easy_request_w->remove_queued(); } @@ -2614,7 +2630,7 @@ bool AIPerService::sNoHTTPBandwidthThrottling; // running requests (in MultiHandle::mAddedEasyRequests)). // //static -AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service) +AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type) { using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; @@ -2650,31 +2666,30 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } } - // Check if it's ok to get a new request for this particular service and update the per-service threshold. + // Check if it's ok to get a new request for this particular capability type and update the per-capability-type threshold. bool reject, equal, increment_threshold; { PerService_wat per_service_w(*per_service); - S32 const pipelined_requests_per_service = per_service_w->pipelined_requests(); - //llassert(pipelined_requests_per_service >= 0 && pipelined_requests_per_service <= 16); - reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests; - equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests; - increment_threshold = per_service_w->mRequestStarvation; - decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty; - // Reset flags. - per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false; + CapabilityType& ct(per_service_w->mCapabilityType[capability_type]); + S32 const pipelined_requests_per_capability_type = ct.pipelined_requests(); + reject = pipelined_requests_per_capability_type >= ct.mMaxPipelinedRequests; + equal = pipelined_requests_per_capability_type == ct.mMaxPipelinedRequests; + increment_threshold = ct.mFlags & ctf_starvation; + decrement_threshold = (ct.mFlags & (ctf_empty | ctf_full)) == ctf_full; + ct.mFlags = 0; if (decrement_threshold) { - if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections) + if (ct.mMaxPipelinedRequests > per_service_w->mConcurrectConnections) { - per_service_w->mMaxPipelinedRequests--; + ct.mMaxPipelinedRequests--; } } else if (increment_threshold && reject) { - if (per_service_w->mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections) + if (ct.mMaxPipelinedRequests < 2 * per_service_w->mConcurrectConnections) { - per_service_w->mMaxPipelinedRequests++; + ct.mMaxPipelinedRequests++; // Immediately take the new threshold into account. reject = !equal; } @@ -2683,8 +2698,7 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c { // Before releasing the lock on per_service, stop other threads from getting a // too small value from pipelined_requests() and approving too many requests. - per_service_w->mApprovedRequests++; - //llassert(per_service_w->mApprovedRequests <= 16); + ct.mApprovedRequests++; } } if (reject) @@ -2697,7 +2711,7 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c if (checkBandwidthUsage(per_service, sTime_40ms)) { // Too much bandwidth is being used, either in total or for this service. - PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. return NULL; } @@ -2731,10 +2745,10 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c } if (reject) { - PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all. + PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all. return NULL; } - return new Approvement(per_service); + return new Approvement(per_service, capability_type); } bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms) diff --git a/indra/llmessage/llhttpclient.h b/indra/llmessage/llhttpclient.h index e376530e5..af11a6687 100644 --- a/indra/llmessage/llhttpclient.h +++ b/indra/llmessage/llhttpclient.h @@ -224,6 +224,9 @@ public: // If this function returns false then we generate an error when a redirect status (300..399) is received. virtual bool redirect_status_ok(void) const { return followRedir(); } + // Returns the capability type used by this responder. + virtual AICapabilityType capability_type(void) const { return cap_other; } + // Timeout policy to use. virtual AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const = 0; diff --git a/indra/newview/llinventorymodel.h b/indra/newview/llinventorymodel.h index 3f87c2615..a73a1fee6 100644 --- a/indra/newview/llinventorymodel.h +++ b/indra/newview/llinventorymodel.h @@ -89,6 +89,7 @@ public: fetchInventoryResponder(const LLSD& request_sd) : mRequestSD(request_sd) {}; /*virtual*/ void result(const LLSD& content); /*virtual*/ void error(U32 status, const std::string& reason); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_inventory; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return fetchInventoryResponder_timeout; } /*virtual*/ char const* getName(void) const { return "fetchInventoryResponder"; } protected: diff --git a/indra/newview/llinventorymodelbackgroundfetch.cpp b/indra/newview/llinventorymodelbackgroundfetch.cpp index dba9c9836..6a828db9b 100644 --- a/indra/newview/llinventorymodelbackgroundfetch.cpp +++ b/indra/newview/llinventorymodelbackgroundfetch.cpp @@ -420,6 +420,7 @@ class LLInventoryModelFetchDescendentsResponder : public LLHTTPClient::Responder {}; /*virtual*/ void result(const LLSD& content); /*virtual*/ void error(U32 status, const std::string& reason); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_inventory; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return inventoryModelFetchDescendentsResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLInventoryModelFetchDescendentsResponder"; } @@ -652,7 +653,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (folder_lib_count == max_batch_size || (folder_lib_count == 0 && - !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -663,7 +664,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (folder_count == max_batch_size || (folder_count == 0 && - !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -701,7 +702,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (item_count == max_batch_size || (item_count == 0 && - !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } @@ -712,7 +713,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch() { if (item_lib_count == max_batch_size || (item_lib_count == 0 && - !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr)))) + !(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_inventory)))) { break; } diff --git a/indra/newview/llmeshrepository.cpp b/indra/newview/llmeshrepository.cpp index 0caa10716..d380609fd 100644 --- a/indra/newview/llmeshrepository.cpp +++ b/indra/newview/llmeshrepository.cpp @@ -229,6 +229,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshHeaderResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshHeaderResponder"; } }; @@ -256,6 +257,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshLODResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshLODResponder"; } }; @@ -276,6 +278,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshSkinInfoResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshSkinInfoResponder"; } }; @@ -296,6 +299,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshDecompositionResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshDecompositionResponder"; } }; @@ -316,6 +320,7 @@ public: const LLChannelDescriptors& channels, const LLIOPipe::buffer_ptr_t& buffer); + /*virtual*/ AICapabilityType capability_type(void) const { return cap_mesh; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return meshPhysicsShapeResponder_timeout; } /*virtual*/ char const* getName(void) const { return "LLMeshPhysicsShapeResponder"; } }; diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index 0d58442c8..cdc31ae43 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -413,11 +413,8 @@ public: } } - /*virtual*/ bool followRedir() const - { - return mFollowRedir; - } - + /*virtual*/ bool followRedir() const { return mFollowRedir; } + /*virtual*/ AICapabilityType capability_type(void) const { return cap_texture; } /*virtual*/ AIHTTPTimeoutPolicy const& getHTTPTimeoutPolicy(void) const { return HTTPGetResponder_timeout; } /*virtual*/ char const* getName(void) const { return "HTTPGetResponder"; } @@ -1275,7 +1272,7 @@ bool LLTextureFetchWorker::doWork(S32 param) // AIPerService::approveHTTPRequestFor returns approvement for ONE request. // This object keeps track of whether or not that is honored. - LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr); + LLPointer approved = AIPerService::approveHTTPRequestFor(mPerServicePtr, cap_texture); if (!approved) { return false ; //wait.