Rewrite of AIPerService::add_queued_to
This should fix any 'stalls' with zero added requests for mesh.
This commit is contained in:
@@ -74,8 +74,6 @@ AIPerService::AIPerService(void) :
|
||||
mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms.
|
||||
mConcurrentConnections(CurlConcurrentConnectionsPerService),
|
||||
mTotalAdded(0),
|
||||
mApprovedFirst(0),
|
||||
mUnapprovedFirst(0),
|
||||
mUsedCT(0),
|
||||
mCTInUse(0)
|
||||
{
|
||||
@@ -397,99 +395,122 @@ bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityTyp
|
||||
return true;
|
||||
}
|
||||
|
||||
void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool recursive)
|
||||
void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool only_this_service)
|
||||
{
|
||||
int order[number_of_capability_types];
|
||||
// The first two types are approved types, they should be the first to try.
|
||||
// Try the one that has the largest queue first, if they the queues have equal size, try mApprovedFirst first.
|
||||
size_t s0 = mCapabilityType[0].mQueuedRequests.size();
|
||||
size_t s1 = mCapabilityType[1].mQueuedRequests.size();
|
||||
if (s0 == s1)
|
||||
U32 success = 0; // The CTs that we successfully added a request for from the queue.
|
||||
bool success_this_pass = false;
|
||||
int i = 0;
|
||||
// The first pass we only look at CTs with 0 requests added to the multi handle. Subsequent passes only non-zero ones.
|
||||
for (int pass = 0;; ++i)
|
||||
{
|
||||
order[0] = mApprovedFirst;
|
||||
mApprovedFirst = 1 - mApprovedFirst;
|
||||
order[1] = mApprovedFirst;
|
||||
}
|
||||
else if (s0 > s1)
|
||||
{
|
||||
order[0] = 0;
|
||||
order[1] = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
order[0] = 1;
|
||||
order[1] = 0;
|
||||
}
|
||||
// The next two types are unapproved types. Here, try them alternating regardless of queue size.
|
||||
int n = mUnapprovedFirst;
|
||||
for (int i = 2; i < number_of_capability_types; ++i, n = (n + 1) % (number_of_capability_types - 2))
|
||||
{
|
||||
order[i] = 2 + n;
|
||||
}
|
||||
mUnapprovedFirst = (mUnapprovedFirst + 1) % (number_of_capability_types - 2);
|
||||
|
||||
for (int i = 0; i < number_of_capability_types; ++i)
|
||||
{
|
||||
CapabilityType& ct(mCapabilityType[order[i]]);
|
||||
if (!ct.mQueuedRequests.empty())
|
||||
if (i == number_of_capability_types)
|
||||
{
|
||||
i = 0;
|
||||
// Keep trying until we couldn't add anything anymore.
|
||||
if (pass++ && !success_this_pass)
|
||||
{
|
||||
// Done.
|
||||
break;
|
||||
}
|
||||
success_this_pass = false;
|
||||
}
|
||||
CapabilityType& ct(mCapabilityType[i]);
|
||||
if (!pass != !ct.mAdded) // Does mAdded match what we're looking for (first mAdded == 0, then mAdded != 0)?
|
||||
{
|
||||
continue;
|
||||
}
|
||||
if (multi_handle->added_maximum())
|
||||
{
|
||||
// We hit the maximum number of global connections. Abort every attempt to add anything.
|
||||
only_this_service = true;
|
||||
break;
|
||||
}
|
||||
if (mTotalAdded >= mConcurrentConnections)
|
||||
{
|
||||
// We hit the maximum number of connections for this service. Abort any attempt to add anything to this service.
|
||||
break;
|
||||
}
|
||||
if (ct.mAdded >= ct.mConcurrentConnections)
|
||||
{
|
||||
// We hit the maximum number of connections for this capability type. Try the next one.
|
||||
continue;
|
||||
}
|
||||
U32 mask = CT2mask((AICapabilityType)i);
|
||||
if (ct.mQueuedRequests.empty()) // Is there anything in the queue (left) at all?
|
||||
{
|
||||
// We could add a new request, but there is none in the queue!
|
||||
// Note that if this service does not serve this capability type,
|
||||
// then obviously this queue was empty; however, in that case
|
||||
// this variable will never be looked at, so it's ok to set it.
|
||||
ct.mFlags |= ((success & mask) ? ctf_empty : ctf_starvation);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Attempt to add the front of the queue.
|
||||
if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true))
|
||||
{
|
||||
// Throttled. If this failed then every capability type will fail: we either are using too much bandwidth, or too many total connections.
|
||||
// However, it MAY be that this service was thottled for using too much bandwidth by itself. Look if other services can be added.
|
||||
// If that failed then we got throttled on bandwidth because the maximum number of connections were not reached yet.
|
||||
// Therefore this will keep failing for this service, we abort any additional attempt to add something for this service.
|
||||
break;
|
||||
}
|
||||
// Request was added, remove it from the queue.
|
||||
ct.mQueuedRequests.pop_front();
|
||||
if (ct.mQueuedRequests.empty())
|
||||
{
|
||||
// We obtained a request from the queue, and after that there we no more request in the queue of this service.
|
||||
ct.mFlags |= ctf_empty;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We obtained a request from the queue, and even after that there was at least one more request in the queue of this service.
|
||||
ct.mFlags |= ctf_full;
|
||||
}
|
||||
// Mark that at least one request of this CT was successfully added.
|
||||
success |= mask;
|
||||
success_this_pass = true;
|
||||
// Update count and the empty flag of sTotalQueued.
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
llassert(total_queued_w->count > 0);
|
||||
if (!--(total_queued_w->count))
|
||||
{
|
||||
// We obtained a request from the queue, and after that there we no more request in any queue.
|
||||
total_queued_w->empty = true;
|
||||
// Since there is no request left anywhere anymore, abort looking for one.
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
|
||||
total_queued_w->full = true;
|
||||
}
|
||||
// We added something from a queue, so we're done.
|
||||
return;
|
||||
}
|
||||
else
|
||||
}
|
||||
|
||||
for (int i = 0; i < number_of_capability_types; ++i)
|
||||
{
|
||||
CapabilityType& ct(mCapabilityType[i]);
|
||||
U32 mask = CT2mask((AICapabilityType)i);
|
||||
// Skip CTs that we didn't add anything for.
|
||||
if (!(success & mask))
|
||||
{
|
||||
// We could add a new request, but there is none in the queue!
|
||||
// Note that if this service does not serve this capability type,
|
||||
// then obviously this queue was empty; however, in that case
|
||||
// this variable will never be looked at, so it's ok to set it.
|
||||
ct.mFlags |= ctf_starvation;
|
||||
continue;
|
||||
}
|
||||
if (i == number_of_capability_types - 1)
|
||||
if (!ct.mQueuedRequests.empty())
|
||||
{
|
||||
// Last entry also empty. All queues of this service were empty. Check total connections.
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
if (total_queued_w->count == 0)
|
||||
// We obtained one or more requests from the queue, and even after that there was at least one more request in the queue of this CT.
|
||||
ct.mFlags |= ctf_full;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the starvation and full flags of sTotalQueued.
|
||||
{
|
||||
TotalQueued_wat total_queued_w(sTotalQueued);
|
||||
if (total_queued_w->count == 0)
|
||||
{
|
||||
if (!success)
|
||||
{
|
||||
// The queue of every service is empty!
|
||||
total_queued_w->starvation = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
else if (success)
|
||||
{
|
||||
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
|
||||
total_queued_w->full = true;
|
||||
}
|
||||
}
|
||||
if (recursive)
|
||||
|
||||
// Don't try other services if anything was added successfully.
|
||||
if (success || only_this_service)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Nothing from this service could be added, try other services.
|
||||
instance_map_wat instance_map_w(sInstanceMap);
|
||||
for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service)
|
||||
|
||||
@@ -160,8 +160,6 @@ class AIPerService {
|
||||
AIAverage mHTTPBandwidth; // Keeps track on number of bytes received for this service in the past second.
|
||||
int mConcurrentConnections; // The maximum number of allowed concurrent connections to this service.
|
||||
int mTotalAdded; // Number of active easy handles with this service.
|
||||
int mApprovedFirst; // First capability type to try.
|
||||
int mUnapprovedFirst; // First capability type to try after all approved types were tried.
|
||||
|
||||
U32 mUsedCT; // Bit mask with one bit per capability type. A '1' means the capability was in use since the last resetUsedCT().
|
||||
U32 mCTInUse; // Bit mask with one bit per capability type. A '1' means the capability is in use right now.
|
||||
@@ -260,11 +258,12 @@ class AIPerService {
|
||||
void removed_from_multi_handle(AICapabilityType capability_type, bool downloaded_something); // Called when an easy handle for this service is removed again from the multi handle.
|
||||
void download_started(AICapabilityType capability_type) { ++mCapabilityType[capability_type].mDownloading; }
|
||||
bool throttled(AICapabilityType capability_type) const; // Returns true if the maximum number of allowed requests for this service/capability type have been added to the multi handle.
|
||||
bool nothing_added(AICapabilityType capability_type) const { return mCapabilityType[capability_type].mAdded == 0; }
|
||||
|
||||
bool queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type, bool force_queuing = true); // Add easy_request to the queue if queue is empty or force_queuing.
|
||||
bool cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type); // Remove easy_request from the queue (if it's there).
|
||||
|
||||
void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool recursive = false);
|
||||
void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh, bool only_this_service = false);
|
||||
// Add queued easy handle (if any) to the multi handle. The request is removed from the queue,
|
||||
// followed by either a call to added_to_multi_handle() or to queue() to add it back.
|
||||
|
||||
|
||||
@@ -1710,7 +1710,7 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread().
|
||||
U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread().
|
||||
|
||||
bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue)
|
||||
{
|
||||
@@ -1724,13 +1724,23 @@ bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool f
|
||||
if (!from_queue)
|
||||
{
|
||||
// Add the request to the back of a non-empty queue.
|
||||
if (PerService_wat(*per_service)->queue(easy_request, capability_type, false))
|
||||
PerService_wat per_service_w(*per_service);
|
||||
if (per_service_w->queue(easy_request, capability_type, false))
|
||||
{
|
||||
// The queue was not empty, therefore the request was queued.
|
||||
#ifdef SHOW_ASSERT
|
||||
// Not active yet, but it's no longer an error if next we try to remove the request.
|
||||
curl_easy_request_w->mRemovedPerCommand = false;
|
||||
#endif
|
||||
// This is a fail-safe. Normally, if there is anything in the queue then things should
|
||||
// be running (normally an attempt is made to add from the queue whenever a request
|
||||
// finishes). However, it CAN happen on occassion that things get 'stuck' with
|
||||
// nothing running, so nothing will ever finish and therefore the queue would never
|
||||
// be checked. Only do this when there is indeed nothing running (added) though.
|
||||
if (per_service_w->nothing_added(capability_type))
|
||||
{
|
||||
per_service_w->add_queued_to(this);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,8 @@
|
||||
namespace AICurlPrivate {
|
||||
namespace curlthread {
|
||||
|
||||
extern U32 curl_max_total_concurrent_connections;
|
||||
|
||||
class PollSet;
|
||||
|
||||
// For ordering a std::set with AICurlEasyRequest objects.
|
||||
@@ -100,6 +102,9 @@ class MultiHandle : public CurlMultiHandle
|
||||
// Return the total number of added curl requests.
|
||||
static U32 total_added_size(void) { return sTotalAdded; }
|
||||
|
||||
// Return true if we reached the global maximum number of connections.
|
||||
static bool added_maximum(void) { return sTotalAdded >= curl_max_total_concurrent_connections; }
|
||||
|
||||
public:
|
||||
//-----------------------------------------------------------------------------
|
||||
// Curl socket administration:
|
||||
|
||||
Reference in New Issue
Block a user