Add HTTP bandwidth throttling for every other responder.

Most notably getMesh (the only one possibly using any significant
bandwidth), but in general every type of requests that just have to
happen anyway and in the order they are requested: they are just passed
to the curl thread, but now the curl thread will queue them and hold
back if the (general) service they use is loaded too heavily.
This commit is contained in:
Aleric Inglewood
2013-05-05 23:44:01 +02:00
parent 75a45f501e
commit 1d629438c0
10 changed files with 62 additions and 32 deletions

View File

@@ -1269,7 +1269,8 @@ LLMutex BufferedCurlEasyRequest::sResponderCallbackMutex;
bool BufferedCurlEasyRequest::sShuttingDown = false;
AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25);
BufferedCurlEasyRequest::BufferedCurlEasyRequest() : mRequestTransferedBytes(0), mTotalRawBytes(0), mBufferEventsTarget(NULL), mStatus(HTTP_INTERNAL_ERROR_OTHER)
BufferedCurlEasyRequest::BufferedCurlEasyRequest() :
mRequestTransferedBytes(0), mTotalRawBytes(0), mBufferEventsTarget(NULL), mStatus(HTTP_INTERNAL_ERROR_OTHER), mQueueIfTooMuchBandwidthUsage(false)
{
AICurlInterface::Stats::BufferedCurlEasyRequest_count++;
}

View File

@@ -75,11 +75,11 @@ typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerServiceReques
//-----------------------------------------------------------------------------
// AIPerService
// This class provides a static interface to create and maintain instances
// of AIPerService objects, so that at any moment there is at most
// one instance per hostname:port. Those instances then are used to queue curl
// requests when the maximum number of connections for that host already
// have been reached.
// This class provides a static interface to create and maintain instances of AIPerService objects,
// so that at any moment there is at most one instance per service (hostname:port).
// Those instances then are used to queue curl requests when the maximum number of connections
// for that service already have been reached. And to keep track of the bandwidth usage, and the
// number of queued requests in the pipeline, for this service.
class AIPerService {
private:
typedef std::map<std::string, AIPerServicePtr> instance_map_type;
@@ -136,7 +136,7 @@ class AIPerService {
static U64 sLastTime_sMaxPipelinedRequests_increment; // Last time that sMaxPipelinedRequests was incremented.
static U64 sLastTime_sMaxPipelinedRequests_decrement; // Last time that sMaxPipelinedRequests was decremented.
static U64 sLastTime_ThrottleFractionAverage_add; // Last time that sThrottleFraction was added to sThrottleFractionAverage.
static size_t sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth.
static AIThreadSafeSimpleDC<size_t> sThrottleFraction; // A value between 0 and 1024: each service is throttled when it uses more than max_bandwidth * (sThrottleFraction/1024) bandwidth.
static AIAverage sThrottleFractionAverage; // Average of sThrottleFraction over 25 * 40ms = 1 second.
static size_t sHTTPThrottleBandwidth125; // HTTPThrottleBandwidth times 125 (in bytes/s).
static bool sNoHTTPBandwidthThrottling; // Global override to disable bandwidth throttling.
@@ -173,7 +173,7 @@ class AIPerService {
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service);
// Return true if too much bandwidth is being used.
static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth_ptr);
static bool checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth_ptr);
// Accessor for when curl_max_total_concurrent_connections changes.
static LLAtomicS32& maxPipelinedRequests(void) { return sMaxPipelinedRequests; }

View File

@@ -375,6 +375,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
void resetState(void);
void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder);
// Called if this request should be queued on the curl thread when too much bandwidth is being used.
void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; }
buffer_ptr_t& getInput(void) { return mInput; }
buffer_ptr_t& getOutput(void) { return mOutput; }
@@ -416,6 +419,7 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
U32 mRequestTransferedBytes;
size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far.
AIBufferedCurlEasyRequestEvents* mBufferEventsTarget;
bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used.
public:
static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()).
@@ -452,6 +456,9 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
// Return true when prepRequest was already called and the object has not been
// invalidated as a result of calling timed_out().
bool isValid(void) const { return mResponder; }
// Returns true when this request should be queued by the curl thread when too much bandwidth is being used.
bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; }
};
inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void)

View File

@@ -1708,8 +1708,11 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_service = curl_easy_request_w->getPerServicePtr();
bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() &&
AIPerService::checkBandwidthUsage(get_clock_count() * HTTPTimeout::sClockWidth_40ms,
PerServiceRequestQueue_wat(*per_service)->bandwidth());
PerServiceRequestQueue_wat per_service_w(*per_service);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
{
curl_easy_request_w->set_timeout_opts();
if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK)
@@ -2578,7 +2581,7 @@ U64 AIPerService::sLastTime_sMaxPipelinedRequests_increment = 0;
U64 AIPerService::sLastTime_sMaxPipelinedRequests_decrement = 0;
LLAtomicS32 AIPerService::sMaxPipelinedRequests(32);
U64 AIPerService::sLastTime_ThrottleFractionAverage_add = 0;
size_t AIPerService::sThrottleFraction = 1024;
AIThreadSafeSimpleDC<size_t> AIPerService::sThrottleFraction(1024);
AIAverage AIPerService::sThrottleFractionAverage(25);
size_t AIPerService::sHTTPThrottleBandwidth125 = 250000;
bool AIPerService::sNoHTTPBandwidthThrottling;
@@ -2678,7 +2681,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
}
// Throttle on bandwidth usage.
if (checkBandwidthUsage(sTime_40ms, http_bandwidth_ptr))
if (checkBandwidthUsage(sTime_40ms, *http_bandwidth_ptr))
{
// Too much bandwidth is being used, either in total or for this service.
return false;
@@ -2717,7 +2720,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
return !reject;
}
bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth_ptr)
bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage& http_bandwidth)
{
if (sNoHTTPBandwidthThrottling)
return false;
@@ -2727,19 +2730,21 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth
// Truncate the sums to the last second, and get their value.
size_t const max_bandwidth = AIPerService::getHTTPThrottleBandwidth125();
size_t const total_bandwidth = BufferedCurlEasyRequest::sHTTPBandwidth.truncateData(sTime_40ms); // Bytes received in the past second.
size_t const service_bandwidth = http_bandwidth_ptr->truncateData(sTime_40ms); // Idem for just this service.
size_t const service_bandwidth = http_bandwidth.truncateData(sTime_40ms); // Idem for just this service.
AIAccess<size_t> throttle_fraction_w(sThrottleFraction);
// Note that sLastTime_ThrottleFractionAverage_add is protected by the lock on sThrottleFraction...
if (sTime_40ms > sLastTime_ThrottleFractionAverage_add)
{
sThrottleFractionAverage.addData(sThrottleFraction, sTime_40ms);
sThrottleFractionAverage.addData(*throttle_fraction_w, sTime_40ms);
// Only add sThrottleFraction once every 40 ms at most.
// It's ok to ignore other values in the same 40 ms because the value only changes on the scale of 1 second.
sLastTime_ThrottleFractionAverage_add = sTime_40ms;
}
double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data.
double fraction_avg = sThrottleFractionAverage.getAverage(1024.0); // sThrottleFraction averaged over the past second, or 1024 if there is no data.
// Adjust sThrottleFraction based on total bandwidth usage.
if (total_bandwidth == 0)
sThrottleFraction = 1024;
*throttle_fraction_w = 1024;
else
{
// This is the main formula. It can be made plausible by assuming
@@ -2764,16 +2769,16 @@ bool AIPerService::checkBandwidthUsage(U64 sTime_40ms, AIAverage* http_bandwidth
// one second if the throttle fraction wasn't changed. However it is
// changed here. The end result is that any change more or less
// linear fades away in one second.
sThrottleFraction = fraction_avg * max_bandwidth / total_bandwidth;
*throttle_fraction_w = fraction_avg * max_bandwidth / total_bandwidth + 0.5;
}
if (sThrottleFraction > 1024)
sThrottleFraction = 1024;
if (*throttle_fraction_w > 1024)
*throttle_fraction_w = 1024;
if (total_bandwidth > max_bandwidth)
{
sThrottleFraction *= 0.95;
*throttle_fraction_w *= 0.95;
}
// Throttle this service if it uses too much bandwidth.
return (service_bandwidth > (max_bandwidth * sThrottleFraction / 1024));
return (service_bandwidth > (max_bandwidth * *throttle_fraction_w / 1024));
}

View File

@@ -208,7 +208,8 @@ void LLHTTPClient::request(
EAllowCompressedReply allow_compression,
AIStateMachine* parent,
AIStateMachine::state_type new_parent_state,
AIEngine* default_engine)
AIEngine* default_engine,
bool queue_if_too_much_bandwidth_usage)
{
llassert(responder);
@@ -221,7 +222,7 @@ void LLHTTPClient::request(
LLURLRequest* req;
try
{
req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression);
req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage);
#ifdef DEBUG_CURLIO
req->mCurlEasyRequest.debug(debug);
#endif
@@ -701,6 +702,11 @@ void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr r
request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state);
}
void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state)
{
request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false);
}
void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
{
request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply);

View File

@@ -433,7 +433,8 @@ public:
EAllowCompressedReply allow_compression = allow_compressed_reply,
AIStateMachine* parent = NULL,
/*AIStateMachine::state_type*/ U32 new_parent_state = 0,
AIEngine* default_engine = &gMainThreadEngine);
AIEngine* default_engine = &gMainThreadEngine,
bool queue_if_too_much_bandwidth_usage = true);
/** @name non-blocking API */
//@{
@@ -465,6 +466,10 @@ public:
static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0)
{ AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); }
static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0);
static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0)
{ AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); }
/** Takes ownership of request and deletes it when sent */
static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive);
static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive)

View File

@@ -75,10 +75,15 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action)
// This might throw AICurlNoEasyHandle.
LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body,
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression) :
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression,
bool queue_if_too_much_bandwidth_usage) :
mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression),
mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "<uninitialized>")
{
if (queue_if_too_much_bandwidth_usage)
{
AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage();
}
}
void LLURLRequest::initialize_impl(void)

View File

@@ -64,7 +64,9 @@ class LLURLRequest : public AICurlEasyRequestStateMachine {
* @param action One of the ERequestAction enumerations.
* @param url The url of the request. It should already be encoded.
*/
LLURLRequest(ERequestAction action, std::string const& url, Injector* body, LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool no_compression);
LLURLRequest(ERequestAction action, std::string const& url, Injector* body,
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers,
bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage);
/**
* @brief Cached value of responder->getName() as passed to the constructor.

View File

@@ -697,14 +697,14 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
if (folder_request_body["folders"].size())
{
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats);
LLHTTPClient::post(url, folder_request_body, fetcher);
LLHTTPClient::post_nb(url, folder_request_body, fetcher);
}
if (folder_request_body_lib["folders"].size())
{
std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2");
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats);
LLHTTPClient::post(url_lib, folder_request_body_lib, fetcher);
LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher);
}
}
if (item_count)
@@ -722,7 +722,7 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body;
LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body));
LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body));
}
}
@@ -738,13 +738,12 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body_lib;
LLHTTPClient::post(url, body, new LLInventoryModelFetchItemResponder(body));
LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body));
}
}
}
mFetchTimer.reset();
}
else if (isBulkFetchProcessingComplete())
{
llinfos << "Inventory fetch completed" << llendl;

View File

@@ -1321,7 +1321,7 @@ bool LLTextureFetchWorker::doWork(S32 param)
}
LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL,
new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true),
headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL);
headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false);
res = true;
}
if (!res)