AIPerService improvements.

* Removed the 'RequestQueue' from other PerServiceRequestQueue occurances
  in the code.
* Made wantsMoreHTTPRequestsFor and checkBandwidthUsage threadsafe (by
  grouping the static variables of AIPerService into thread ThreadSafe
  groups.
This commit is contained in:
Aleric Inglewood
2013-05-07 16:10:09 +02:00
parent 410f9d92a5
commit e3fec7c715
4 changed files with 163 additions and 137 deletions

View File

@@ -1251,7 +1251,7 @@ AIPerServicePtr CurlEasyRequest::getPerServicePtr(void)
bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const
{
// Note that easy_request (must) represent(s) this object; it's just passed for convenience.
return mPerServicePtr && PerServiceRequestQueue_wat(*mPerServicePtr)->cancel(easy_request);
return mPerServicePtr && PerService_wat(*mPerServicePtr)->cancel(easy_request);
}
std::string CurlEasyRequest::getLowercaseHostname(void) const

View File

@@ -42,10 +42,7 @@
#include "llcontrol.h"
AIPerService::threadsafe_instance_map_type AIPerService::sInstanceMap;
LLAtomicS32 AIPerService::sTotalQueued;
bool AIPerService::sQueueEmpty;
bool AIPerService::sQueueFull;
bool AIPerService::sRequestStarvation;
AIThreadSafeSimpleDC<AIPerService::TotalQueued> AIPerService::sTotalQueued;
#undef AICurlPrivate
@@ -54,14 +51,14 @@ namespace AICurlPrivate {
// Cached value of CurlConcurrentConnectionsPerService.
U32 CurlConcurrentConnectionsPerService;
// Friend functions of RefCountedThreadSafePerServiceRequestQueue
// Friend functions of RefCountedThreadSafePerService
void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* per_service)
void intrusive_ptr_add_ref(RefCountedThreadSafePerService* per_service)
{
per_service->mReferenceCount++;
}
void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_service)
void intrusive_ptr_release(RefCountedThreadSafePerService* per_service)
{
if (--per_service->mReferenceCount == 0)
{
@@ -194,7 +191,7 @@ AIPerServicePtr AIPerService::instance(std::string const& servicename)
AIPerService::iterator iter = instance_map_w->find(servicename);
if (iter == instance_map_w->end())
{
iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerServiceRequestQueue)).first;
iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerService)).first;
}
// Note: the creation of AIPerServicePtr MUST be protected by the lock on sInstanceMap (see release()).
return iter->second;
@@ -219,7 +216,7 @@ void AIPerService::release(AIPerServicePtr& instance)
return;
}
// The reference in the map is the last one; that means there can't be any curl easy requests queued for this host.
llassert(PerServiceRequestQueue_rat(*instance)->mQueuedRequests.empty());
llassert(PerService_rat(*instance)->mQueuedRequests.empty());
// Find the host and erase it from the map.
iterator const end = instance_map_w->end();
for(iterator iter = instance_map_w->begin(); iter != end; ++iter)
@@ -256,7 +253,7 @@ void AIPerService::removed_from_multi_handle(void)
void AIPerService::queue(AICurlEasyRequest const& easy_request)
{
mQueuedRequests.push_back(easy_request.get_ptr());
sTotalQueued++;
TotalQueued_wat(sTotalQueued)->count++;
}
bool AIPerService::cancel(AICurlEasyRequest const& easy_request)
@@ -280,8 +277,9 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request)
prev = cur;
}
mQueuedRequests.pop_back(); // if this is safe.
--sTotalQueued;
llassert(sTotalQueued >= 0);
TotalQueued_wat total_queued_w(sTotalQueued);
total_queued_w->count--;
llassert(total_queued_w->count >= 0);
return true;
}
@@ -291,17 +289,6 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle)
{
multi_handle->add_easy_request(mQueuedRequests.front());
mQueuedRequests.pop_front();
llassert(sTotalQueued > 0);
if (!--sTotalQueued)
{
// We obtained a request from the queue, and after that there we no more request in any queue.
sQueueEmpty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
sQueueFull = true;
}
if (mQueuedRequests.empty())
{
// We obtained a request from the queue, and after that there we no more request in the queue of this host.
@@ -312,15 +299,28 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle)
// We obtained a request from the queue, and even after that there was at least one more request in the queue of this host.
mQueueFull = true;
}
TotalQueued_wat total_queued_w(sTotalQueued);
llassert(total_queued_w->count > 0);
if (!--(total_queued_w->count))
{
// We obtained a request from the queue, and after that there we no more request in any queue.
total_queued_w->empty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
total_queued_w->full = true;
}
}
else
{
// We can add a new request, but there is none in the queue!
mRequestStarvation = true;
if (sTotalQueued == 0)
TotalQueued_wat total_queued_w(sTotalQueued);
if (total_queued_w->count == 0)
{
// The queue of every host is empty!
sRequestStarvation = true;
total_queued_w->starvation = true;
}
}
}
@@ -332,11 +332,12 @@ void AIPerService::purge(void)
for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host)
{
Dout(dc::curl, "Purging queue of host \"" << host->first << "\".");
PerServiceRequestQueue_wat per_service_w(*host->second);
PerService_wat per_service_w(*host->second);
size_t s = per_service_w->mQueuedRequests.size();
per_service_w->mQueuedRequests.clear();
sTotalQueued -= s;
llassert(sTotalQueued >= 0);
TotalQueued_wat total_queued_w(sTotalQueued);
total_queued_w->count -= s;
llassert(total_queued_w->count >= 0);
}
}
@@ -346,7 +347,7 @@ void AIPerService::adjust_concurrent_connections(int increment)
instance_map_wat instance_map_w(sInstanceMap);
for (AIPerService::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter)
{
PerServiceRequestQueue_wat per_service_w(*iter->second);
PerService_wat per_service_w(*iter->second);
U32 old_concurrent_connections = per_service_w->mConcurrectConnections;
per_service_w->mConcurrectConnections = llclamp(old_concurrent_connections + increment, (U32)1, CurlConcurrentConnectionsPerService);
increment = per_service_w->mConcurrectConnections - old_concurrent_connections;

View File

@@ -53,7 +53,7 @@ class AIPerService;
namespace AICurlPrivate {
namespace curlthread { class MultiHandle; }
class RefCountedThreadSafePerServiceRequestQueue;
class RefCountedThreadSafePerService;
class ThreadSafeBufferedCurlEasyRequest;
// Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h).
@@ -61,16 +61,16 @@ typedef boost::intrusive_ptr<ThreadSafeBufferedCurlEasyRequest> BufferedCurlEasy
// AIPerService objects are created by the curl thread and destructed by the main thread.
// We need locking.
typedef AIThreadSafeSimpleDC<AIPerService> threadsafe_PerServiceRequestQueue;
typedef AIAccessConst<AIPerService> PerServiceRequestQueue_crat;
typedef AIAccess<AIPerService> PerServiceRequestQueue_rat;
typedef AIAccess<AIPerService> PerServiceRequestQueue_wat;
typedef AIThreadSafeSimpleDC<AIPerService> threadsafe_PerService;
typedef AIAccessConst<AIPerService> PerService_crat;
typedef AIAccess<AIPerService> PerService_rat;
typedef AIAccess<AIPerService> PerService_wat;
} // namespace AICurlPrivate
// We can't put threadsafe_PerServiceRequestQueue in a std::map because you can't copy a mutex.
// We can't put threadsafe_PerService in a std::map because you can't copy a mutex.
// Therefore, use an intrusive pointer for the threadsafe type.
typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerServiceRequestQueue> AIPerServicePtr;
typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerService> AIPerServicePtr;
//-----------------------------------------------------------------------------
// AIPerService
@@ -89,7 +89,7 @@ class AIPerService {
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerService instances with the hostname as key.
friend class AIThreadSafeSimpleDC<AIPerService>; // threadsafe_PerServiceRequestQueue
friend class AIThreadSafeSimpleDC<AIPerService>; // threadsafe_PerService
AIPerService(void);
public:
@@ -118,27 +118,60 @@ class AIPerService {
int mAdded; // Number of active easy handles with this host.
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerService objects together.
bool mQueueEmpty; // Set to true when the queue becomes precisely empty.
bool mQueueFull; // Set to true when the queue is popped and then still isn't empty;
bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty.
static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue.
static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue.
static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero.
AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second.
int mConcurrectConnections; // The maximum number of allowed concurrent connections to this service.
int mMaxPipelinedRequests; // The maximum number of accepted requests for this service that didn't finish yet.
static LLAtomicS32 sMaxPipelinedRequests; // The maximum total number of accepted requests that didn't finish yet.
static U64 sLastTime_sMaxPipelinedRequests_increment; // Last time that sMaxPipelinedRequests was incremented.
static U64 sLastTime_sMaxPipelinedRequests_decrement; // Last time that sMaxPipelinedRequests was decremented.
static U64 sLastTime_ThrottleFractionAverage_add; // Last time that sThrottleFraction was added to sThrottleFractionAverage.
static AIThreadSafeSimpleDC<size_t> sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth.
static AIAverage sThrottleFractionAverage; // Average of sThrottleFraction over 25 * 40ms = 1 second.
static size_t sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s).
// Global administration of the total number of queued requests of all services combined.
private:
struct TotalQueued {
S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together.
bool empty; // Set to true when count becomes precisely zero as the result of popping any queue.
bool full; // Set to true when count is still larger than zero after popping any queue.
bool starvation; // Set to true when any queue was about to be popped when count was already zero.
TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { }
};
static AIThreadSafeSimpleDC<TotalQueued> sTotalQueued;
typedef AIAccessConst<TotalQueued> TotalQueued_crat;
typedef AIAccess<TotalQueued> TotalQueued_rat;
typedef AIAccess<TotalQueued> TotalQueued_wat;
public:
static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; }
// Global administration of the maximum number of pipelined requests of all services combined.
private:
struct MaxPipelinedRequests {
S32 count; // The maximum total number of accepted requests that didn't finish yet.
U64 last_increment; // Last time that sMaxPipelinedRequests was incremented.
U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented.
MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { }
};
static AIThreadSafeSimpleDC<MaxPipelinedRequests> sMaxPipelinedRequests;
typedef AIAccessConst<MaxPipelinedRequests> MaxPipelinedRequests_crat;
typedef AIAccess<MaxPipelinedRequests> MaxPipelinedRequests_rat;
typedef AIAccess<MaxPipelinedRequests> MaxPipelinedRequests_wat;
public:
static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; }
static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; }
// Global administration of throttle fraction (which is the same for all services).
private:
struct ThrottleFraction {
U32 fraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth.
AIAverage average; // Average of fraction over 25 * 40ms = 1 second.
U64 last_add; // Last time that faction was added to average.
ThrottleFraction(void) : fraction(1024), average(25), last_add(0) { }
};
static AIThreadSafeSimpleDC<ThrottleFraction> sThrottleFraction;
typedef AIAccessConst<ThrottleFraction> ThrottleFraction_crat;
typedef AIAccess<ThrottleFraction> ThrottleFraction_rat;
typedef AIAccess<ThrottleFraction> ThrottleFraction_wat;
static LLAtomicU32 sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s).
static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling.
public:
@@ -156,7 +189,6 @@ class AIPerService {
// followed by either a call to added_to_multi_handle() or to queue() to add it back.
S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; }
static S32 total_queued_size(void) { return sTotalQueued; }
AIAverage& bandwidth(void) { return mHTTPBandwidth; }
AIAverage const& bandwidth(void) const { return mHTTPBandwidth; }
@@ -168,15 +200,17 @@ class AIPerService {
// Called when CurlConcurrentConnectionsPerService changes.
static void adjust_concurrent_connections(int increment);
// The two following functions are static and have the AIPerService object passed
// as first argument as an AIPerServicePtr because that avoids the need of having
// the AIPerService object locked for the whole duration of the call.
// The functions only lock it when access is required.
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service);
// Return true if too much bandwidth is being used.
static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth_ptr);
// Accessor for when curl_max_total_concurrent_connections changes.
static LLAtomicS32& maxPipelinedRequests(void) { return sMaxPipelinedRequests; }
static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms);
private:
// Disallow copying.
@@ -185,17 +219,17 @@ class AIPerService {
namespace AICurlPrivate {
class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceRequestQueue {
class RefCountedThreadSafePerService : public threadsafe_PerService {
public:
RefCountedThreadSafePerServiceRequestQueue(void) : mReferenceCount(0) { }
RefCountedThreadSafePerService(void) : mReferenceCount(0) { }
bool exactly_two_left(void) const { return mReferenceCount == 2; }
private:
// Used by AIPerServicePtr. Object is deleted when reference count reaches zero.
LLAtomicU32 mReferenceCount;
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p);
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerService* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerService* p);
};
extern U32 CurlConcurrentConnectionsPerService;

View File

@@ -1331,11 +1331,11 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
case cmd_boost: // FIXME: future stuff
break;
case cmd_add:
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue();
PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue();
multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()));
break;
case cmd_remove:
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true);
break;
}
@@ -1708,10 +1708,8 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_service = curl_easy_request_w->getPerServicePtr();
bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() &&
AIPerService::checkBandwidthUsage(get_clock_count() * HTTPTimeout::sClockWidth_40ms,
PerServiceRequestQueue_wat(*per_service)->bandwidth());
PerServiceRequestQueue_wat per_service_w(*per_service);
bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms);
PerService_wat per_service_w(*per_service);
if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
{
curl_easy_request_w->set_timeout_opts();
@@ -1736,7 +1734,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
return;
}
// The request could not be added, we have to queue it.
PerServiceRequestQueue_wat(*per_service)->queue(easy_request);
PerService_wat(*per_service)->queue(easy_request);
#ifdef SHOW_ASSERT
// Not active yet, but it's no longer an error if next we try to remove the request.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false;
@@ -1774,7 +1772,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
AICurlEasyRequest_wat curl_easy_request_w(**iter);
res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle);
per_service = curl_easy_request_w->getPerServicePtr();
PerServiceRequestQueue_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
PerService_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
#ifdef SHOW_ASSERT
curl_easy_request_w->mRemovedPerCommand = as_per_command;
#endif
@@ -1791,7 +1789,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
#endif
// Attempt to add a queued request, if any.
PerServiceRequestQueue_wat(*per_service)->add_queued_to(this);
PerService_wat(*per_service)->add_queued_to(this);
return res;
}
@@ -2124,7 +2122,7 @@ void BufferedCurlEasyRequest::update_body_bandwidth(void)
if (raw_bytes > 0)
{
U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2;
AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*getPerServicePtr())->bandwidth());
AIAverage& http_bandwidth(PerService_wat(*getPerServicePtr())->bandwidth());
http_bandwidth.addData(raw_bytes, sTime_40ms);
sHTTPBandwidth.addData(raw_bytes, sTime_40ms);
}
@@ -2219,7 +2217,7 @@ size_t BufferedCurlEasyRequest::curlHeaderCallback(char* data, size_t size, size
}
// Update HTTP bandwidth.
U64 const sTime_40ms = curlthread::HTTPTimeout::sTime_10ms >> 2;
AIAverage& http_bandwidth(PerServiceRequestQueue_wat(*self_w->getPerServicePtr())->bandwidth());
AIAverage& http_bandwidth(PerService_wat(*self_w->getPerServicePtr())->bandwidth());
http_bandwidth.addData(header_len, sTime_40ms);
sHTTPBandwidth.addData(header_len, sTime_40ms);
// Update timeout administration. This must be done after the status is already known.
@@ -2425,7 +2423,7 @@ void AICurlEasyRequest::addRequest(void)
command_queue_w->commands.push_back(Command(*this, cmd_add));
command_queue_w->size++;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue();
PerService_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue();
curl_easy_request_w->add_queued();
}
// Something was added to the queue, wake up the thread to get it.
@@ -2489,7 +2487,7 @@ void AICurlEasyRequest::removeRequest(void)
command_queue_w->commands.push_back(Command(*this, cmd_remove));
command_queue_w->size--;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'.
PerService_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'.
// Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request.
curl_easy_request_w->remove_queued();
}
@@ -2515,7 +2513,7 @@ void startCurlThread(LLControlGroup* control_group)
curl_max_total_concurrent_connections = sConfigGroup->getU32("CurlMaxTotalConcurrentConnections");
CurlConcurrentConnectionsPerService = sConfigGroup->getU32("CurlConcurrentConnectionsPerService");
gNoVerifySSLCert = sConfigGroup->getBOOL("NoVerifySSLCert");
AIPerService::maxPipelinedRequests() = curl_max_total_concurrent_connections;
AIPerService::setMaxPipelinedRequests(curl_max_total_concurrent_connections);
AIPerService::setHTTPThrottleBandwidth(sConfigGroup->getF32("HTTPThrottleBandwidth"));
AICurlThread::sInstance = new AICurlThread;
@@ -2529,7 +2527,7 @@ bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue)
U32 old = curl_max_total_concurrent_connections;
curl_max_total_concurrent_connections = newvalue.asInteger();
AIPerService::maxPipelinedRequests() += curl_max_total_concurrent_connections - old;
AIPerService::incrementMaxPipelinedRequests(curl_max_total_concurrent_connections - old);
llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl;
return true;
}
@@ -2580,13 +2578,9 @@ size_t getHTTPBandwidth(void)
} // namespace AICurlInterface
// Global AIPerService members.
U64 AIPerService::sLastTime_sMaxPipelinedRequests_increment = 0;
U64 AIPerService::sLastTime_sMaxPipelinedRequests_decrement = 0;
LLAtomicS32 AIPerService::sMaxPipelinedRequests(32);
U64 AIPerService::sLastTime_ThrottleFractionAverage_add = 0;
AIThreadSafeSimpleDC<size_t> AIPerService::sThrottleFraction(1024);
AIAverage AIPerService::sThrottleFractionAverage(25);
size_t AIPerService::sHTTPThrottleBandwidth125 = 250000;
AIThreadSafeSimpleDC<AIPerService::MaxPipelinedRequests> AIPerService::sMaxPipelinedRequests;
AIThreadSafeSimpleDC<AIPerService::ThrottleFraction> AIPerService::sThrottleFraction;
LLAtomicU32 AIPerService::sHTTPThrottleBandwidth125(250000);
bool AIPerService::sNoHTTPBandwidthThrottling;
// Return true if we want at least one more HTTP request for this host.
@@ -2619,38 +2613,42 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
bool reject, equal, increment_threshold, decrement_threshold;
// Do certain things at most once every 40ms.
U64 const sTime_40ms = get_clock_count() * HTTPTimeout::sClockWidth_40ms; // Time in 40ms units.
// Atomic read sMaxPipelinedRequests for the below calculations.
S32 const max_pipelined_requests_cache = sMaxPipelinedRequests;
// Cache all sTotalQueued info.
bool starvation, decrement_threshold;
S32 total_queued_or_added = MultiHandle::total_added_size();
{
TotalQueued_wat total_queued_w(sTotalQueued);
total_queued_or_added += total_queued_w->count;
starvation = total_queued_w->starvation;
decrement_threshold = total_queued_w->full && !total_queued_w->empty;
total_queued_w->empty = total_queued_w->full = false; // Reset flags.
}
// Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate.
decrement_threshold = sQueueFull && !sQueueEmpty;
sQueueEmpty = sQueueFull = false; // Reset flags.
if (decrement_threshold)
{
if (max_pipelined_requests_cache > (S32)curl_max_total_concurrent_connections &&
sTime_40ms > sLastTime_sMaxPipelinedRequests_decrement)
MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests);
if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections &&
sTime_40ms > max_pipelined_requests_w->last_decrement)
{
// Decrement the threshold because since the last call to this function at least one curl request finished
// and was replaced with another request from the queue, but the queue never ran empty: we have too many
// queued requests.
--sMaxPipelinedRequests;
max_pipelined_requests_w->count--;
// Do this at most once every 40 ms.
sLastTime_sMaxPipelinedRequests_decrement = sTime_40ms;
max_pipelined_requests_w->last_decrement = sTime_40ms;
}
}
// Check if it's ok to get a new request for this particular service and update the per-service threshold.
AIAverage* http_bandwidth_ptr;
bool reject, equal, increment_threshold;
{
PerServiceRequestQueue_wat per_service_w(*per_service);
PerService_wat per_service_w(*per_service);
S32 const pipelined_requests_per_service = per_service_w->pipelined_requests();
reject = pipelined_requests_per_service >= per_service_w->mMaxPipelinedRequests;
equal = pipelined_requests_per_service == per_service_w->mMaxPipelinedRequests;
@@ -2658,8 +2656,6 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
decrement_threshold = per_service_w->mQueueFull && !per_service_w->mQueueEmpty;
// Reset flags.
per_service_w->mQueueFull = per_service_w->mQueueEmpty = per_service_w->mRequestStarvation = false;
// Grab per service bandwidth object.
http_bandwidth_ptr = &per_service_w->bandwidth();
if (decrement_threshold)
{
if (per_service_w->mMaxPipelinedRequests > per_service_w->mConcurrectConnections)
@@ -2684,7 +2680,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
}
// Throttle on bandwidth usage.
if (checkBandwidthUsage(sTime_40ms, *http_bandwidth_ptr))
if (checkBandwidthUsage(per_service, sTime_40ms))
{
// Too much bandwidth is being used, either in total or for this service.
return false;
@@ -2692,30 +2688,28 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
// Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate.
{
command_queue_rat command_queue_r(command_queue);
S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size();
// We can't take the command being processed (command_being_processed) into account without
// introducing relatively long waiting times for some mutex (namely between when the command
// is moved from command_queue to command_being_processed, till it's actually being added to
// mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time
// that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness
// here instead.
S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added;
// We can't take the command being processed (command_being_processed) into account without
// introducing relatively long waiting times for some mutex (namely between when the command
// is moved from command_queue to command_being_processed, till it's actually being added to
// mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time
// that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness
// here instead.
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
// that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus
// the number of already running requests.
reject = pipelined_requests >= max_pipelined_requests_cache;
equal = pipelined_requests == max_pipelined_requests_cache;
increment_threshold = sRequestStarvation;
}
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
// that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus
// the number of already running requests.
MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests);
reject = pipelined_requests >= max_pipelined_requests_w->count;
equal = pipelined_requests == max_pipelined_requests_w->count;
increment_threshold = starvation;
if (increment_threshold && reject)
{
if (max_pipelined_requests_cache < 2 * (S32)curl_max_total_concurrent_connections &&
sTime_40ms > sLastTime_sMaxPipelinedRequests_increment)
if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections &&
sTime_40ms > max_pipelined_requests_w->last_increment)
{
sMaxPipelinedRequests++;
sLastTime_sMaxPipelinedRequests_increment = sTime_40ms;
max_pipelined_requests_w->count++;
max_pipelined_requests_w->last_increment = sTime_40ms;
// Immediately take the new threshold into account.
reject = !equal;
}
@@ -2723,7 +2717,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
return !reject;
}
bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth)
bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms)
{
if (sNoHTTPBandwidthThrottling)
return false;
@@ -2732,27 +2726,26 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth
// Truncate the sums to the last second, and get their value.
size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125();
size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second.
size_t const service_bandwidth = http_bandwidth.truncateData(sTime_40ms); // Idem for just this service.
AIAccess<size_t> throttle_fraction_w(sThrottleFraction);
// Note that sLastTime_ThrottleFractionAverage_add is protected by the lock on sThrottleFraction...
if (sTime_40ms > sLastTime_ThrottleFractionAverage_add)
size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second.
size_t const service_bandwidth = PerService_wat(*per_service)->bandwidth().truncateData(sTime_40ms); // Idem for just this service.
ThrottleFraction_wat throttle_fraction_w(sThrottleFraction);
if (sTime_40ms > throttle_fraction_w->last_add)
{
sThrottleFractionAverage.addData(*throttle_fraction_w, sTime_40ms);
// Only add sThrottleFraction once every 40 ms at most.
throttle_fraction_w->average.addData(throttle_fraction_w->fraction, sTime_40ms);
// Only add throttle_fraction_w->fraction once every 40 ms at most.
// It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second.
sLastTime_ThrottleFractionAverage_add = sTime_40ms;
throttle_fraction_w->last_add = sTime_40ms;
}
double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data.
double fraction_avg = throttle_fraction_w->average.getAverage(1024.0); // throttle_fraction_w->fraction averaged over the past second, or 1024 if there is no data.
// Adjust sThrottleFraction based on total bandwidth usage.
// Adjust the fraction based on total bandwidth usage.
if (total_bandwidth == 0)
*throttle_fraction_w = 1024;
throttle_fraction_w->fraction = 1024;
else
{
// This is the main formula. It can be made plausible by assuming
// an equilibrium where total_bandwidth == max_bandwidth and
// thus sThrottleFraction == fraction_avg for more than a second.
// thus fraction == fraction_avg for more than a second.
//
// Then, more bandwidth is being used (for example because another
// service starts downloading). Assuming that all services that use
@@ -2765,23 +2758,21 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth
// For example, let max_bandwidth be 1. Let there be two throttled
// services, each using 0.5 (fraction_avg = 1024/2). Let the new
// service use what it can: also 0.5 - then without reduction the
// total_bandwidth would become 1.5, and sThrottleFraction would
// total_bandwidth would become 1.5, and fraction would
// become (1024/2) * 1/1.5 = 1024/3: from 2 to 3 services.
//
// In reality, total_bandwidth would rise linear from 1.0 to 1.5 in
// one second if the throttle fraction wasn't changed. However it is
// changed here. The end result is that any change more or less
// linear fades away in one second.
*throttle_fraction_w = fraction_avg * max_bandwidth / total_bandwidth + 0.5;
throttle_fraction_w->fraction = llmin(1024., fraction_avg * max_bandwidth / total_bandwidth + 0.5);
}
if (*throttle_fraction_w > 1024)
*throttle_fraction_w = 1024;
if (total_bandwidth > max_bandwidth)
{
*throttle_fraction_w *= 0.95;
throttle_fraction_w->fraction *= 0.95;
}
// Throttle this service if it uses too much bandwidth.
return (service_bandwidth > (max_bandwidth * *throttle_fraction_w / 1024));
return (service_bandwidth > (max_bandwidth * throttle_fraction_w->fraction / 1024));
}