Fix global connection threshold not to count non-approved queued request as being in the pipeline.
This make more sense when comparing the threshold against 2 times the maximum allowed total connections. As a result, textures won't stall when there are around 128 mesh requests queued.
This commit is contained in:
@@ -73,6 +73,7 @@ using namespace AICurlPrivate;
|
||||
AIPerService::AIPerService(void) :
|
||||
mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms.
|
||||
mConcurrentConnections(CurlConcurrentConnectionsPerService),
|
||||
mApprovedRequests(0),
|
||||
mTotalAdded(0),
|
||||
mUsedCT(0),
|
||||
mCTInUse(0)
|
||||
@@ -363,7 +364,10 @@ bool AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType
|
||||
if (needs_queuing)
|
||||
{
|
||||
queued_requests.push_back(easy_request.get_ptr());
|
||||
TotalQueued_wat(sTotalQueued)->count++;
|
||||
if (is_approved(capability_type))
|
||||
{
|
||||
TotalQueued_wat(sTotalQueued)->approved++;
|
||||
}
|
||||
}
|
||||
return needs_queuing;
|
||||
}
|
||||
@@ -389,9 +393,12 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityTyp
|
||||
prev = cur;
|
||||
}
|
||||
mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe.
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
total_queued_w->count--;
|
||||
llassert(total_queued_w->count >= 0);
|
||||
if (is_approved(capability_type))
|
||||
{
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
llassert(total_queued_w->approved > 0);
|
||||
total_queued_w->approved--;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -458,23 +465,26 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool onl
|
||||
// Mark that at least one request of this CT was successfully added.
|
||||
success |= mask;
|
||||
success_this_pass = true;
|
||||
// Update count and the empty flag of sTotalQueued.
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
llassert(total_queued_w->count > 0);
|
||||
if (!--(total_queued_w->count))
|
||||
// Update approved count.
|
||||
if (is_approved((AICapabilityType)i))
|
||||
{
|
||||
// We obtained a request from the queue, and after that there we no more request in any queue.
|
||||
total_queued_w->empty = true;
|
||||
// Since there is no request left anywhere anymore, abort looking for one.
|
||||
break;
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
llassert(total_queued_w->approved > 0);
|
||||
total_queued_w->approved--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t queuedapproved_size = 0;
|
||||
for (int i = 0; i < number_of_capability_types; ++i)
|
||||
{
|
||||
CapabilityType& ct(mCapabilityType[i]);
|
||||
U32 mask = CT2mask((AICapabilityType)i);
|
||||
// Add up the size of all queues with approved requests.
|
||||
if ((approved_mask & mask))
|
||||
{
|
||||
queuedapproved_size += ct.mQueuedRequests.size();
|
||||
}
|
||||
// Skip CTs that we didn't add anything for.
|
||||
if (!(success & mask))
|
||||
{
|
||||
@@ -487,20 +497,25 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool onl
|
||||
}
|
||||
}
|
||||
|
||||
// Update the starvation and full flags of sTotalQueued.
|
||||
// Update the flags of sTotalQueued.
|
||||
{
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
if (total_queued_w->count == 0)
|
||||
if (total_queued_w->approved == 0)
|
||||
{
|
||||
if (!success)
|
||||
if ((success & approved_mask))
|
||||
{
|
||||
// The queue of every service is empty!
|
||||
// We obtained an approved request from the queue, and after that there were no more requests in any (approved) queue.
|
||||
total_queued_w->empty = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Every queue of every approved CT is empty!
|
||||
total_queued_w->starvation = true;
|
||||
}
|
||||
}
|
||||
else if (success)
|
||||
else if ((success & approved_mask))
|
||||
{
|
||||
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
|
||||
// We obtained an approved request from the queue, and even after that there was at least one more request in some (approved) queue.
|
||||
total_queued_w->full = true;
|
||||
}
|
||||
}
|
||||
@@ -537,8 +552,11 @@ void AIPerService::purge(void)
|
||||
{
|
||||
size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size();
|
||||
per_service_w->mCapabilityType[i].mQueuedRequests.clear();
|
||||
total_queued_w->count -= s;
|
||||
llassert(total_queued_w->count >= 0);
|
||||
if (is_approved((AICapabilityType)i))
|
||||
{
|
||||
llassert(total_queued_w->approved >= s);
|
||||
total_queued_w->approved -= s;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -575,8 +593,9 @@ void AIPerService::Approvement::honored(void)
|
||||
{
|
||||
mHonored = true;
|
||||
PerService_wat per_service_w(*mPerServicePtr);
|
||||
llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0);
|
||||
llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0 && per_service_w->mApprovedRequests > 0);
|
||||
per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--;
|
||||
per_service_w->mApprovedRequests--;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -87,6 +87,8 @@ enum AICapabilityType { // {Capabilities} [Responders]
|
||||
number_of_capability_types = 4
|
||||
};
|
||||
|
||||
static U32 const approved_mask = 3; // The mask of cap_texture OR-ed with the mask of cap_inventory.
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// AIPerService
|
||||
|
||||
@@ -137,7 +139,7 @@ class AIPerService {
|
||||
typedef std::deque<AICurlPrivate::BufferedCurlEasyRequestPtr> queued_request_type;
|
||||
|
||||
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
|
||||
U16 mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet.
|
||||
U16 mApprovedRequests; // The number of approved requests for this CT by approveHTTPRequestFor that were not added to the command queue yet.
|
||||
U16 mQueuedCommands; // Number of add commands (minus remove commands), for this service, in the command queue.
|
||||
U16 mAdded; // Number of active easy handles with this service.
|
||||
U16 mFlags; // ctf_empty: Set to true when the queue becomes precisely empty.
|
||||
@@ -159,6 +161,7 @@ class AIPerService {
|
||||
|
||||
AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second.
|
||||
int mConcurrentConnections; // The maximum number of allowed concurrent connections to this service.
|
||||
int mApprovedRequests; // The number of approved requests for this service by approveHTTPRequestFor that were not added to the command queue yet.
|
||||
int mTotalAdded; // Number of active easy handles with this service.
|
||||
|
||||
U32 mUsedCT; // Bit mask with one bit per capability type. A '1' means the capability was in use since the last resetUsedCT().
|
||||
@@ -194,6 +197,7 @@ class AIPerService {
|
||||
}
|
||||
}
|
||||
public:
|
||||
static bool is_approved(AICapabilityType capability_type) { return (((U32)1 << capability_type) & approved_mask); }
|
||||
static U32 CT2mask(AICapabilityType capability_type) { return (U32)1 << capability_type; }
|
||||
void resetUsedCt(void) { mUsedCT = mCTInUse; }
|
||||
bool is_used(AICapabilityType capability_type) const { return (mUsedCT & CT2mask(capability_type)); }
|
||||
@@ -206,34 +210,34 @@ class AIPerService {
|
||||
// Global administration of the total number of queued requests of all services combined.
|
||||
private:
|
||||
struct TotalQueued {
|
||||
S32 count; // The sum of mQueuedRequests.size() of all AIPerService objects together.
|
||||
bool empty; // Set to true when count becomes precisely zero as the result of popping any queue.
|
||||
bool full; // Set to true when count is still larger than zero after popping any queue.
|
||||
bool starvation; // Set to true when any queue was about to be popped when count was already zero.
|
||||
TotalQueued(void) : count(0), empty(false), full(false), starvation(false) { }
|
||||
S32 approved; // The sum of mQueuedRequests.size() of all AIPerService::CapabilityType objects of approved types.
|
||||
bool empty; // Set to true when approved becomes precisely zero as the result of popping any queue.
|
||||
bool full; // Set to true when approved is still larger than zero after popping any queue.
|
||||
bool starvation; // Set to true when any queue was about to be popped when approved was already zero.
|
||||
TotalQueued(void) : approved(0), empty(false), full(false), starvation(false) { }
|
||||
};
|
||||
static AIThreadSafeSimpleDC<TotalQueued> sTotalQueued;
|
||||
typedef AIAccessConst<TotalQueued> TotalQueued_crat;
|
||||
typedef AIAccess<TotalQueued> TotalQueued_rat;
|
||||
typedef AIAccess<TotalQueued> TotalQueued_wat;
|
||||
public:
|
||||
static S32 total_queued_size(void) { return TotalQueued_rat(sTotalQueued)->count; }
|
||||
static S32 total_approved_queue_size(void) { return TotalQueued_rat(sTotalQueued)->approved; }
|
||||
|
||||
// Global administration of the maximum number of pipelined requests of all services combined.
|
||||
private:
|
||||
struct MaxPipelinedRequests {
|
||||
S32 count; // The maximum total number of accepted requests that didn't finish yet.
|
||||
S32 threshold; // The maximum total number of accepted requests that didn't finish yet.
|
||||
U64 last_increment; // Last time that sMaxPipelinedRequests was incremented.
|
||||
U64 last_decrement; // Last time that sMaxPipelinedRequests was decremented.
|
||||
MaxPipelinedRequests(void) : count(32), last_increment(0), last_decrement(0) { }
|
||||
MaxPipelinedRequests(void) : threshold(32), last_increment(0), last_decrement(0) { }
|
||||
};
|
||||
static AIThreadSafeSimpleDC<MaxPipelinedRequests> sMaxPipelinedRequests;
|
||||
typedef AIAccessConst<MaxPipelinedRequests> MaxPipelinedRequests_crat;
|
||||
typedef AIAccess<MaxPipelinedRequests> MaxPipelinedRequests_rat;
|
||||
typedef AIAccess<MaxPipelinedRequests> MaxPipelinedRequests_wat;
|
||||
public:
|
||||
static void setMaxPipelinedRequests(S32 count) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count = count; }
|
||||
static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->count += increment; }
|
||||
static void setMaxPipelinedRequests(S32 threshold) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold = threshold; }
|
||||
static void incrementMaxPipelinedRequests(S32 increment) { MaxPipelinedRequests_wat(sMaxPipelinedRequests)->threshold += increment; }
|
||||
|
||||
// Global administration of throttle fraction (which is the same for all services).
|
||||
private:
|
||||
|
||||
@@ -2628,7 +2628,7 @@ U32 getNumHTTPCommands(void)
|
||||
|
||||
U32 getNumHTTPQueued(void)
|
||||
{
|
||||
return AIPerService::total_queued_size();
|
||||
return AIPerService::total_approved_queue_size();
|
||||
}
|
||||
|
||||
U32 getNumHTTPAdded(void)
|
||||
@@ -2676,6 +2676,51 @@ bool AIPerService::sNoHTTPBandwidthThrottling;
|
||||
// AIPerService::mQueuedRequests and the already running requests
|
||||
// (in MultiHandle::mAddedEasyRequests)).
|
||||
//
|
||||
// A request has two types of reasons why it can be throttled:
|
||||
// 1) The number of connections.
|
||||
// 2) Bandwidth usage.
|
||||
// And three levels where each can occur:
|
||||
// a) Global
|
||||
// b) Service
|
||||
// c) Capability Type (CT)
|
||||
// Currently, not all of those are in use. The ones that are used are:
|
||||
//
|
||||
// | Global | Service | CT
|
||||
// +--------+---------+--------
|
||||
// 1) The number of connections | X | X | X
|
||||
// 2) Bandwidth usage | X | |
|
||||
//
|
||||
// Pre-approved requests have the bandwidth tested here, and the
|
||||
// connections tested in the curl thread, right before they are
|
||||
// added to the multi handle.
|
||||
//
|
||||
// The "pipeline" is as follows:
|
||||
//
|
||||
// <approvement> // If the number of requests in the pipeline is less than a threshold
|
||||
// | // and the global bandwidth usage is not too large.
|
||||
// V
|
||||
// <command queue>
|
||||
// |
|
||||
// V
|
||||
// <CT queue>
|
||||
// |
|
||||
// V
|
||||
// <added to multi handle> // If the number of connections at all three levels allow it.
|
||||
// |
|
||||
// V
|
||||
// <removed from multi handle>
|
||||
//
|
||||
// Every time this function is called, but not more often than once every 40 ms, the state
|
||||
// of the CT queue is checked to be starvation, empty or full. If it is starvation
|
||||
// then the threshold for allowed number of connections is incremented by one,
|
||||
// if it is empty then nother is done and when it is full then the threshold is
|
||||
// decremented by one.
|
||||
//
|
||||
// Starvation means that we could add a request from the queue to the multi handle,
|
||||
// but the queue was empty. Empty means that after adding one or more requests to the
|
||||
// multi handle the queue became empty, and full means that after adding one of more
|
||||
// requests to the multi handle the queue still wasn't empty (see AIPerService::add_queued_to).
|
||||
//
|
||||
//static
|
||||
AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service, AICapabilityType capability_type)
|
||||
{
|
||||
@@ -2687,13 +2732,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
|
||||
// Cache all sTotalQueued info.
|
||||
bool starvation, decrement_threshold;
|
||||
S32 total_queued_or_added = MultiHandle::total_added_size();
|
||||
S32 total_approved_queuedapproved_or_added = MultiHandle::total_added_size();
|
||||
{
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
total_queued_or_added += total_queued_w->count;
|
||||
total_approved_queuedapproved_or_added += total_queued_w->approved;
|
||||
starvation = total_queued_w->starvation;
|
||||
decrement_threshold = total_queued_w->full && !total_queued_w->empty;
|
||||
total_queued_w->empty = total_queued_w->full = false; // Reset flags.
|
||||
total_queued_w->starvation = total_queued_w->empty = total_queued_w->full = false; // Reset flags.
|
||||
}
|
||||
|
||||
// Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate.
|
||||
@@ -2701,13 +2746,13 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
if (decrement_threshold)
|
||||
{
|
||||
MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests);
|
||||
if (max_pipelined_requests_w->count > (S32)curl_max_total_concurrent_connections &&
|
||||
if (max_pipelined_requests_w->threshold > (S32)curl_max_total_concurrent_connections &&
|
||||
sTime_40ms > max_pipelined_requests_w->last_decrement)
|
||||
{
|
||||
// Decrement the threshold because since the last call to this function at least one curl request finished
|
||||
// and was replaced with another request from the queue, but the queue never ran empty: we have too many
|
||||
// queued requests.
|
||||
max_pipelined_requests_w->count--;
|
||||
max_pipelined_requests_w->threshold--;
|
||||
// Do this at most once every 40 ms.
|
||||
max_pipelined_requests_w->last_decrement = sTime_40ms;
|
||||
}
|
||||
@@ -2746,7 +2791,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
// Before releasing the lock on per_service, stop other threads from getting a
|
||||
// too small value from pipelined_requests() and approving too many requests.
|
||||
ct.mApprovedRequests++;
|
||||
per_service_w->mApprovedRequests++;
|
||||
}
|
||||
total_approved_queuedapproved_or_added += per_service_w->mApprovedRequests;
|
||||
}
|
||||
if (reject)
|
||||
{
|
||||
@@ -2758,13 +2805,15 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
if (checkBandwidthUsage(per_service, sTime_40ms))
|
||||
{
|
||||
// Too much bandwidth is being used, either in total or for this service.
|
||||
PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all.
|
||||
PerService_wat per_service_w(*per_service);
|
||||
per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all.
|
||||
per_service_w->mApprovedRequests--;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate.
|
||||
|
||||
S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_queued_or_added;
|
||||
S32 const pipelined_requests = command_queue_rat(command_queue)->size + total_approved_queuedapproved_or_added;
|
||||
// We can't take the command being processed (command_being_processed) into account without
|
||||
// introducing relatively long waiting times for some mutex (namely between when the command
|
||||
// is moved from command_queue to command_being_processed, till it's actually being added to
|
||||
@@ -2773,18 +2822,19 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
// here instead.
|
||||
|
||||
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
|
||||
// that may exist in the pipeline minus the number of requests queued in AIPerService objects, minus
|
||||
// the number of already running requests.
|
||||
// that may exist in the pipeline minus the number approved requests not yet added to the command queue, minus the
|
||||
// number of requests queued in AIPerService objects, minus the number of already running requests
|
||||
// (excluding non-approved requests queued in their CT queue).
|
||||
MaxPipelinedRequests_wat max_pipelined_requests_w(sMaxPipelinedRequests);
|
||||
reject = pipelined_requests >= max_pipelined_requests_w->count;
|
||||
equal = pipelined_requests == max_pipelined_requests_w->count;
|
||||
reject = pipelined_requests >= max_pipelined_requests_w->threshold;
|
||||
equal = pipelined_requests == max_pipelined_requests_w->threshold;
|
||||
increment_threshold = starvation;
|
||||
if (increment_threshold && reject)
|
||||
{
|
||||
if (max_pipelined_requests_w->count < 2 * (S32)curl_max_total_concurrent_connections &&
|
||||
if (max_pipelined_requests_w->threshold < 2 * (S32)curl_max_total_concurrent_connections &&
|
||||
sTime_40ms > max_pipelined_requests_w->last_increment)
|
||||
{
|
||||
max_pipelined_requests_w->count++;
|
||||
max_pipelined_requests_w->threshold++;
|
||||
max_pipelined_requests_w->last_increment = sTime_40ms;
|
||||
// Immediately take the new threshold into account.
|
||||
reject = !equal;
|
||||
@@ -2792,7 +2842,9 @@ AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr c
|
||||
}
|
||||
if (reject)
|
||||
{
|
||||
PerService_wat(*per_service)->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all.
|
||||
PerService_wat per_service_w(*per_service);
|
||||
per_service_w->mCapabilityType[capability_type].mApprovedRequests--; // Not approved after all.
|
||||
per_service_w->mApprovedRequests--;
|
||||
return NULL;
|
||||
}
|
||||
return new Approvement(per_service, capability_type);
|
||||
|
||||
Reference in New Issue
Block a user