From 748d339ee6351d43dc885c486c3c98afdc07f203 Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Mon, 8 Apr 2013 22:46:01 +0200 Subject: [PATCH] Move decision whether or not to add new HTTP request from texture fetcher to AICurl After commit things compile again :). The HTTP bandwidth throttling is not yet implemented. I'll put a temporary fix back the next commit that just does it the "old way"... --- indra/llmessage/aicurl.h | 1 + indra/llmessage/aicurlperhost.cpp | 36 ++++++- indra/llmessage/aicurlperhost.h | 18 +++- indra/llmessage/aicurlthread.cpp | 136 ++++++++++++++++++++++++ indra/newview/app_settings/settings.xml | 24 +---- indra/newview/lltexturefetch.cpp | 30 +++--- 6 files changed, 199 insertions(+), 46 deletions(-) diff --git a/indra/llmessage/aicurl.h b/indra/llmessage/aicurl.h index 1b5953e7f..1e1b990db 100644 --- a/indra/llmessage/aicurl.h +++ b/indra/llmessage/aicurl.h @@ -52,6 +52,7 @@ #include "stdtypes.h" // U16, S32, U32, F64 #include "llatomic.h" // LLAtomicU32 #include "aithreadsafe.h" +#include "aicurlperhost.h" // AIPerHostRequestQueuePtr // Debug Settings. extern bool gNoVerifySSLCert; diff --git a/indra/llmessage/aicurlperhost.cpp b/indra/llmessage/aicurlperhost.cpp index 8e9da1334..1d2f157a9 100644 --- a/indra/llmessage/aicurlperhost.cpp +++ b/indra/llmessage/aicurlperhost.cpp @@ -38,6 +38,9 @@ AIPerHostRequestQueue::threadsafe_instance_map_type AIPerHostRequestQueue::sInstanceMap; LLAtomicS32 AIPerHostRequestQueue::sTotalQueued; +bool AIPerHostRequestQueue::sQueueEmpty; +bool AIPerHostRequestQueue::sQueueFull; +bool AIPerHostRequestQueue::sRequestStarvation; #undef AICurlPrivate @@ -221,8 +224,37 @@ void AIPerHostRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle) { multi_handle->add_easy_request(mQueuedRequests.front()); mQueuedRequests.pop_front(); - --sTotalQueued; - llassert(sTotalQueued >= 0); + llassert(sTotalQueued > 0); + if (!--sTotalQueued) + { + // We obtained a request from the queue, and after that there we no more request in any queue. + sQueueEmpty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in some queue. + sQueueFull = true; + } + if (mQueuedRequests.empty()) + { + // We obtained a request from the queue, and after that there we no more request in the queue of this host. + mQueueEmpty = true; + } + else + { + // We obtained a request from the queue, and even after that there was at least one more request in the queue of this host. + mQueueFull = true; + } + } + else + { + // We can add a new request, but there is none in the queue! + mRequestStarvation = true; + if (sTotalQueued == 0) + { + // The queue of every host is empty! + sRequestStarvation = true; + } } } diff --git a/indra/llmessage/aicurlperhost.h b/indra/llmessage/aicurlperhost.h index 457b2cccd..8cfd3808b 100644 --- a/indra/llmessage/aicurlperhost.h +++ b/indra/llmessage/aicurlperhost.h @@ -85,7 +85,7 @@ class AIPerHostRequestQueue { static threadsafe_instance_map_type sInstanceMap; // Map of AIPerHostRequestQueue instances with the hostname as key. friend class AIThreadSafeSimpleDC; //threadsafe_PerHostRequestQueue - AIPerHostRequestQueue(void) : mQueuedCommands(0), mAdded(0) { } + AIPerHostRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false) { } public: typedef instance_map_type::iterator iterator; @@ -112,6 +112,14 @@ class AIPerHostRequestQueue { static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerHostRequestQueue objects together. + bool mQueueEmpty; // Set to true when the queue becomes precisely empty. + bool mQueueFull; // Set to true when the queue is popped and then still isn't empty; + bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty. + + static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue. + static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue. + static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero. + public: void added_to_command_queue(void) { ++mQueuedCommands; } void removed_from_command_queue(void) { --mQueuedCommands; llassert(mQueuedCommands >= 0); } @@ -126,10 +134,14 @@ class AIPerHostRequestQueue { // Add queued easy handle (if any) to the multi handle. The request is removed from the queue, // followed by either a call to added_to_multi_handle() or to queue() to add it back. - S32 queued_commands(void) const { return mQueuedCommands; } - S32 host_queued_plus_added_size(void) const { return mQueuedRequests.size() + mAdded; } + S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; } static S32 total_queued_size(void) { return sTotalQueued; } + // Returns true if curl can handle another request for this host. + // Should return false if the maximum allowed HTTP bandwidth is reached, or when + // the latency between request and actual delivery becomes too large. + static bool wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host); + private: // Disallow copying. AIPerHostRequestQueue(AIPerHostRequestQueue const&) { } diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 4cc147473..64fc1b780 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -204,6 +204,9 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable) namespace AICurlPrivate { +LLAtomicS32 max_pipelined_requests(32); +LLAtomicS32 max_pipelined_requests_per_host(8); + enum command_st { cmd_none, cmd_add, @@ -2476,6 +2479,8 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo curl_max_total_concurrent_connections = CurlMaxTotalConcurrentConnections; curl_concurrent_connections_per_host = CurlConcurrentConnectionsPerHost; gNoVerifySSLCert = NoVerifySSLCert; + max_pipelined_requests = curl_max_total_concurrent_connections; + max_pipelined_requests_per_host = curl_concurrent_connections_per_host; AICurlThread::sInstance = new AICurlThread; AICurlThread::sInstance->start(); @@ -2483,9 +2488,12 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue) { + using namespace AICurlPrivate; using namespace AICurlPrivate::curlthread; + U32 old = curl_max_total_concurrent_connections; curl_max_total_concurrent_connections = newvalue.asInteger(); + max_pipelined_requests += curl_max_total_concurrent_connections - old; llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl; return true; } @@ -2494,7 +2502,9 @@ bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue) { using namespace AICurlPrivate; + U32 old = curl_concurrent_connections_per_host; curl_concurrent_connections_per_host = newvalue.asInteger(); + max_pipelined_requests_per_host += curl_concurrent_connections_per_host - old; llinfos << "CurlConcurrentConnectionsPerHost set to " << curl_concurrent_connections_per_host << llendl; return true; } @@ -2525,3 +2535,129 @@ U32 getNumHTTPAdded(void) } // namespace AICurlInterface +// Return true if we want at least one more HTTP request for this host. +// +// It's OK if this function is a bit fuzzy, but we don't want it to return +// true a hundred times on a row when it is called fast in a loop. +// Hence the following consideration: +// +// This function is called only from LLTextureFetchWorker::doWork, and when it returns true +// then doWork will call LLHTTPClient::request with a NULL default engine (signaling that +// it is OK to run in any thread). +// +// At the end, LLHTTPClient::request calls AIStateMachine::run, which in turn calls +// AIStateMachine::reset at the end. Because NULL is passed as default_engine, reset will +// call AIStateMachine::multiplex to immediately start running the state machine. This +// causes it to go through the states bs_reset, bs_initialize and then bs_multiplex with +// run state AICurlEasyRequestStateMachine_addRequest. Finally, in this state, multiplex +// calls AICurlEasyRequestStateMachine::multiplex_impl which then calls AICurlEasyRequest::addRequest +// which causes an increment of command_queue_w->size and AIPerHostRequestQueue::mQueuedCommands. +// +// It is therefore guaranteed that in one loop of LLTextureFetchWorker::doWork, +// this size is incremented; stopping this function from returning true once we reached the +// threshold of "pipelines" requests (the sum of requests in the command queue, the ones +// throttled and queued in AIPerHostRequestQueue::mQueuedRequests and the already +// running requests (in MultiHandle::mAddedEasyRequests)). +// +//static +bool AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host) +{ + using namespace AICurlPrivate; + using namespace AICurlPrivate::curlthread; + + bool reject, equal, increment_threshold, decrement_threshold; + + // Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate. + + // Atomic read max_pipelined_requests for the below calculations. + S32 const max_pipelined_requests_cache = max_pipelined_requests; + decrement_threshold = sQueueFull && !sQueueEmpty; + // Reset flags. + sQueueEmpty = sQueueFull = false; + if (decrement_threshold) + { + if (max_pipelined_requests_cache > curl_max_total_concurrent_connections) + { + // Decrement the threshold because since the last call to this function at least one curl request finished + // and was replaced with another request from the queue, but the queue never ran empty: we have too many + // queued requests. + --max_pipelined_requests; + } + } + + // Check if it's ok to get a new request for this particular host and update the per-host threshold. + + // Atomic read max_pipelined_requests_per_host for the below calculations. + S32 const max_pipelined_requests_per_host_cache = max_pipelined_requests_per_host; + { + PerHostRequestQueue_rat per_host_r(*per_host); + S32 const pipelined_requests_per_host = per_host_r->pipelined_requests(); + reject = pipelined_requests_per_host >= max_pipelined_requests_per_host_cache; + equal = pipelined_requests_per_host == max_pipelined_requests_per_host_cache; + increment_threshold = per_host_r->mRequestStarvation; + decrement_threshold = per_host_r->mQueueFull && !per_host_r->mQueueEmpty; + // Reset flags. + per_host_r->mQueueFull = per_host_r->mQueueEmpty = per_host_r->mRequestStarvation = false; + } + if (decrement_threshold) + { + if (max_pipelined_requests_per_host_cache > curl_concurrent_connections_per_host) + { + --max_pipelined_requests_per_host; + } + } + else if (increment_threshold && reject) + { + if (max_pipelined_requests_per_host_cache < 2 * curl_concurrent_connections_per_host) + { + max_pipelined_requests_per_host++; + // Immediately take the new threshold into account. + reject = !equal; + } + } + if (reject) + { + // Too many request for this host already. + return false; + } + +#if 0 + //AITODO: better bandwidth check here. + static const LLCachedControl throttle_bandwidth("HTTPThrottleBandwidth", 2000); + if (mFetcher->getTextureBandwidth() > throttle_bandwidth) + { + return false; // wait + } +#endif + + // Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate. + + { + command_queue_rat command_queue_r(command_queue); + S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size(); + // We can't take the command being processed (command_being_processed) into account without + // introducing relatively long waiting times for some mutex (namely between when the command + // is moved from command_queue to command_being_processed, till it's actually being added to + // mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time + // that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness + // here instead. + + // The maximum number of requests that may be queued in command_queue is equal to the total number of requests + // that may exist in the pipeline minus the number of requests queued in AIPerHostRequestQueue objects, minus + // the number of already running requests. + reject = pipelined_requests >= max_pipelined_requests_cache; + equal = pipelined_requests == max_pipelined_requests_cache; + increment_threshold = sRequestStarvation; + } + if (increment_threshold && reject) + { + if (max_pipelined_requests_cache < 2 * curl_max_total_concurrent_connections) + { + max_pipelined_requests++; + // Immediately take the new threshold into account. + reject = !equal; + } + } + return !reject; +} + diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml index 4de95b255..0c3da9fba 100644 --- a/indra/newview/app_settings/settings.xml +++ b/indra/newview/app_settings/settings.xml @@ -171,29 +171,7 @@ Value 1 - - HTTPMaxRequests - - Comment - Maximum number of simultaneous HTTP requests in progress. - Persist - 1 - Type - U32 - Value - 12 - - HTTPMinRequests - - Comment - Attempt to maintain at least this many HTTP requests in progress by ignoring bandwidth - Persist - 1 - Type - U32 - Value - 2 - + HTTPThrottleBandwidth Comment diff --git a/indra/newview/lltexturefetch.cpp b/indra/newview/lltexturefetch.cpp index b7d5d3a2b..4da161c29 100644 --- a/indra/newview/lltexturefetch.cpp +++ b/indra/newview/lltexturefetch.cpp @@ -1248,29 +1248,14 @@ bool LLTextureFetchWorker::doWork(S32 param) { if(mCanUseHTTP) { - //NOTE: - //control the number of the http requests issued for: - //1, not opening too many file descriptors at the same time; - //2, control the traffic of http so udp gets bandwidth. - // - static const LLCachedControl max_http_requests("HTTPMaxRequests", 8); - static const LLCachedControl min_http_requests("HTTPMinRequests", 2); - static const LLCachedControl throttle_bandwidth("HTTPThrottleBandwidth", 2000); - if(((U32)mFetcher->getNumHTTPRequests() >= max_http_requests) || - ((mFetcher->getTextureBandwidth() > throttle_bandwidth) && - ((U32)mFetcher->getNumHTTPRequests() > min_http_requests))) - { - return false ; //wait. - } - - mFetcher->removeFromNetworkQueue(this, false); - S32 cur_size = 0; if (mFormattedImage.notNull()) { cur_size = mFormattedImage->getDataSize(); // amount of data we already have if (mFormattedImage->getDiscardLevel() == 0) { + // Already have all data. + mFetcher->removeFromNetworkQueue(this, false); // Note sure this is necessary, but it's what the old did --Aleric if(cur_size > 0) { // We already have all the data, just decode it @@ -1284,10 +1269,19 @@ bool LLTextureFetchWorker::doWork(S32 param) } } } + + // Let AICurl decide if we can process more HTTP requests at the moment or not. + if (!AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(mPerHostPtr)) + { + return false ; //wait. + } + + mFetcher->removeFromNetworkQueue(this, false); + mRequestedSize = mDesiredSize - cur_size; mRequestedDiscard = mDesiredDiscard; mRequestedOffset = cur_size; - + bool res = false; if (!mUrl.empty()) {