Move decision whether or not to add new HTTP request from texture fetcher to AICurl

After commit things compile again :).
The HTTP bandwidth throttling is not yet implemented. I'll put a
temporary fix back the next commit that just does it the "old way"...
This commit is contained in:
Aleric Inglewood
2013-04-08 22:46:01 +02:00
parent 8d6f5c6ffc
commit 748d339ee6
6 changed files with 199 additions and 46 deletions

View File

@@ -52,6 +52,7 @@
#include "stdtypes.h" // U16, S32, U32, F64
#include "llatomic.h" // LLAtomicU32
#include "aithreadsafe.h"
#include "aicurlperhost.h" // AIPerHostRequestQueuePtr
// Debug Settings.
extern bool gNoVerifySSLCert;

View File

@@ -38,6 +38,9 @@
AIPerHostRequestQueue::threadsafe_instance_map_type AIPerHostRequestQueue::sInstanceMap;
LLAtomicS32 AIPerHostRequestQueue::sTotalQueued;
bool AIPerHostRequestQueue::sQueueEmpty;
bool AIPerHostRequestQueue::sQueueFull;
bool AIPerHostRequestQueue::sRequestStarvation;
#undef AICurlPrivate
@@ -221,8 +224,37 @@ void AIPerHostRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
{
multi_handle->add_easy_request(mQueuedRequests.front());
mQueuedRequests.pop_front();
--sTotalQueued;
llassert(sTotalQueued >= 0);
llassert(sTotalQueued > 0);
if (!--sTotalQueued)
{
// We obtained a request from the queue, and after that there we no more request in any queue.
sQueueEmpty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
sQueueFull = true;
}
if (mQueuedRequests.empty())
{
// We obtained a request from the queue, and after that there we no more request in the queue of this host.
mQueueEmpty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in the queue of this host.
mQueueFull = true;
}
}
else
{
// We can add a new request, but there is none in the queue!
mRequestStarvation = true;
if (sTotalQueued == 0)
{
// The queue of every host is empty!
sRequestStarvation = true;
}
}
}

View File

@@ -85,7 +85,7 @@ class AIPerHostRequestQueue {
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerHostRequestQueue instances with the hostname as key.
friend class AIThreadSafeSimpleDC<AIPerHostRequestQueue>; //threadsafe_PerHostRequestQueue
AIPerHostRequestQueue(void) : mQueuedCommands(0), mAdded(0) { }
AIPerHostRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false) { }
public:
typedef instance_map_type::iterator iterator;
@@ -112,6 +112,14 @@ class AIPerHostRequestQueue {
static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerHostRequestQueue objects together.
bool mQueueEmpty; // Set to true when the queue becomes precisely empty.
bool mQueueFull; // Set to true when the queue is popped and then still isn't empty;
bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty.
static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue.
static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue.
static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero.
public:
void added_to_command_queue(void) { ++mQueuedCommands; }
void removed_from_command_queue(void) { --mQueuedCommands; llassert(mQueuedCommands >= 0); }
@@ -126,10 +134,14 @@ class AIPerHostRequestQueue {
// Add queued easy handle (if any) to the multi handle. The request is removed from the queue,
// followed by either a call to added_to_multi_handle() or to queue() to add it back.
S32 queued_commands(void) const { return mQueuedCommands; }
S32 host_queued_plus_added_size(void) const { return mQueuedRequests.size() + mAdded; }
S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; }
static S32 total_queued_size(void) { return sTotalQueued; }
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host);
private:
// Disallow copying.
AIPerHostRequestQueue(AIPerHostRequestQueue const&) { }

View File

@@ -204,6 +204,9 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable)
namespace AICurlPrivate {
LLAtomicS32 max_pipelined_requests(32);
LLAtomicS32 max_pipelined_requests_per_host(8);
enum command_st {
cmd_none,
cmd_add,
@@ -2476,6 +2479,8 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo
curl_max_total_concurrent_connections = CurlMaxTotalConcurrentConnections;
curl_concurrent_connections_per_host = CurlConcurrentConnectionsPerHost;
gNoVerifySSLCert = NoVerifySSLCert;
max_pipelined_requests = curl_max_total_concurrent_connections;
max_pipelined_requests_per_host = curl_concurrent_connections_per_host;
AICurlThread::sInstance = new AICurlThread;
AICurlThread::sInstance->start();
@@ -2483,9 +2488,12 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo
bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
U32 old = curl_max_total_concurrent_connections;
curl_max_total_concurrent_connections = newvalue.asInteger();
max_pipelined_requests += curl_max_total_concurrent_connections - old;
llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl;
return true;
}
@@ -2494,7 +2502,9 @@ bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue)
{
using namespace AICurlPrivate;
U32 old = curl_concurrent_connections_per_host;
curl_concurrent_connections_per_host = newvalue.asInteger();
max_pipelined_requests_per_host += curl_concurrent_connections_per_host - old;
llinfos << "CurlConcurrentConnectionsPerHost set to " << curl_concurrent_connections_per_host << llendl;
return true;
}
@@ -2525,3 +2535,129 @@ U32 getNumHTTPAdded(void)
} // namespace AICurlInterface
// Return true if we want at least one more HTTP request for this host.
//
// It's OK if this function is a bit fuzzy, but we don't want it to return
// true a hundred times on a row when it is called fast in a loop.
// Hence the following consideration:
//
// This function is called only from LLTextureFetchWorker::doWork, and when it returns true
// then doWork will call LLHTTPClient::request with a NULL default engine (signaling that
// it is OK to run in any thread).
//
// At the end, LLHTTPClient::request calls AIStateMachine::run, which in turn calls
// AIStateMachine::reset at the end. Because NULL is passed as default_engine, reset will
// call AIStateMachine::multiplex to immediately start running the state machine. This
// causes it to go through the states bs_reset, bs_initialize and then bs_multiplex with
// run state AICurlEasyRequestStateMachine_addRequest. Finally, in this state, multiplex
// calls AICurlEasyRequestStateMachine::multiplex_impl which then calls AICurlEasyRequest::addRequest
// which causes an increment of command_queue_w->size and AIPerHostRequestQueue::mQueuedCommands.
//
// It is therefore guaranteed that in one loop of LLTextureFetchWorker::doWork,
// this size is incremented; stopping this function from returning true once we reached the
// threshold of "pipelines" requests (the sum of requests in the command queue, the ones
// throttled and queued in AIPerHostRequestQueue::mQueuedRequests and the already
// running requests (in MultiHandle::mAddedEasyRequests)).
//
//static
bool AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
bool reject, equal, increment_threshold, decrement_threshold;
// Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate.
// Atomic read max_pipelined_requests for the below calculations.
S32 const max_pipelined_requests_cache = max_pipelined_requests;
decrement_threshold = sQueueFull && !sQueueEmpty;
// Reset flags.
sQueueEmpty = sQueueFull = false;
if (decrement_threshold)
{
if (max_pipelined_requests_cache > curl_max_total_concurrent_connections)
{
// Decrement the threshold because since the last call to this function at least one curl request finished
// and was replaced with another request from the queue, but the queue never ran empty: we have too many
// queued requests.
--max_pipelined_requests;
}
}
// Check if it's ok to get a new request for this particular host and update the per-host threshold.
// Atomic read max_pipelined_requests_per_host for the below calculations.
S32 const max_pipelined_requests_per_host_cache = max_pipelined_requests_per_host;
{
PerHostRequestQueue_rat per_host_r(*per_host);
S32 const pipelined_requests_per_host = per_host_r->pipelined_requests();
reject = pipelined_requests_per_host >= max_pipelined_requests_per_host_cache;
equal = pipelined_requests_per_host == max_pipelined_requests_per_host_cache;
increment_threshold = per_host_r->mRequestStarvation;
decrement_threshold = per_host_r->mQueueFull && !per_host_r->mQueueEmpty;
// Reset flags.
per_host_r->mQueueFull = per_host_r->mQueueEmpty = per_host_r->mRequestStarvation = false;
}
if (decrement_threshold)
{
if (max_pipelined_requests_per_host_cache > curl_concurrent_connections_per_host)
{
--max_pipelined_requests_per_host;
}
}
else if (increment_threshold && reject)
{
if (max_pipelined_requests_per_host_cache < 2 * curl_concurrent_connections_per_host)
{
max_pipelined_requests_per_host++;
// Immediately take the new threshold into account.
reject = !equal;
}
}
if (reject)
{
// Too many request for this host already.
return false;
}
#if 0
//AITODO: better bandwidth check here.
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
if (mFetcher->getTextureBandwidth() > throttle_bandwidth)
{
return false; // wait
}
#endif
// Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate.
{
command_queue_rat command_queue_r(command_queue);
S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size();
// We can't take the command being processed (command_being_processed) into account without
// introducing relatively long waiting times for some mutex (namely between when the command
// is moved from command_queue to command_being_processed, till it's actually being added to
// mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time
// that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness
// here instead.
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
// that may exist in the pipeline minus the number of requests queued in AIPerHostRequestQueue objects, minus
// the number of already running requests.
reject = pipelined_requests >= max_pipelined_requests_cache;
equal = pipelined_requests == max_pipelined_requests_cache;
increment_threshold = sRequestStarvation;
}
if (increment_threshold && reject)
{
if (max_pipelined_requests_cache < 2 * curl_max_total_concurrent_connections)
{
max_pipelined_requests++;
// Immediately take the new threshold into account.
reject = !equal;
}
}
return !reject;
}

View File

@@ -171,29 +171,7 @@
<key>Value</key>
<integer>1</integer>
</map>
<key>HTTPMaxRequests</key>
<map>
<key>Comment</key>
<string>Maximum number of simultaneous HTTP requests in progress.</string>
<key>Persist</key>
<integer>1</integer>
<key>Type</key>
<string>U32</string>
<key>Value</key>
<integer>12</integer>
</map>
<key>HTTPMinRequests</key>
<map>
<key>Comment</key>
<string>Attempt to maintain at least this many HTTP requests in progress by ignoring bandwidth</string>
<key>Persist</key>
<integer>1</integer>
<key>Type</key>
<string>U32</string>
<key>Value</key>
<integer>2</integer>
</map>
<key>HTTPThrottleBandwidth</key>
<map>
<key>Comment</key>

View File

@@ -1248,29 +1248,14 @@ bool LLTextureFetchWorker::doWork(S32 param)
{
if(mCanUseHTTP)
{
//NOTE:
//control the number of the http requests issued for:
//1, not opening too many file descriptors at the same time;
//2, control the traffic of http so udp gets bandwidth.
//
static const LLCachedControl<U32> max_http_requests("HTTPMaxRequests", 8);
static const LLCachedControl<U32> min_http_requests("HTTPMinRequests", 2);
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
if(((U32)mFetcher->getNumHTTPRequests() >= max_http_requests) ||
((mFetcher->getTextureBandwidth() > throttle_bandwidth) &&
((U32)mFetcher->getNumHTTPRequests() > min_http_requests)))
{
return false ; //wait.
}
mFetcher->removeFromNetworkQueue(this, false);
S32 cur_size = 0;
if (mFormattedImage.notNull())
{
cur_size = mFormattedImage->getDataSize(); // amount of data we already have
if (mFormattedImage->getDiscardLevel() == 0)
{
// Already have all data.
mFetcher->removeFromNetworkQueue(this, false); // Note sure this is necessary, but it's what the old did --Aleric
if(cur_size > 0)
{
// We already have all the data, just decode it
@@ -1284,10 +1269,19 @@ bool LLTextureFetchWorker::doWork(S32 param)
}
}
}
// Let AICurl decide if we can process more HTTP requests at the moment or not.
if (!AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(mPerHostPtr))
{
return false ; //wait.
}
mFetcher->removeFromNetworkQueue(this, false);
mRequestedSize = mDesiredSize - cur_size;
mRequestedDiscard = mDesiredDiscard;
mRequestedOffset = cur_size;
bool res = false;
if (!mUrl.empty())
{