Add a PerHostRequestQueue object.

Adds a std::map for hostname (or urls) --> PerHostRequestQueue
objects. The latter keeps track of the number of added curl easy
requests and decides if a new request should be throttled or
not, as well as provides the queue to queue throttled requests.

At the moment CurlConcurrentConnectionsPerHost is set to 16,
because things really don't work without LL supporting connection
reuse if we limit it to 2. CurlConcurrentConnectionsPerHost is
also set to non-persistent so that we can easily change it in the future
(once we decide on it's final value it can be set to persistent).
This commit is contained in:
Aleric Inglewood
2012-11-07 15:41:50 +01:00
parent cb52e82a60
commit 68cbf31c8b
11 changed files with 414 additions and 54 deletions

View File

@@ -31,6 +31,7 @@
#include "linden_common.h"
#include "aicurlthread.h"
#include "aihttptimeoutpolicy.h"
#include "aicurlperhost.h"
#include "lltimer.h" // ms_sleep, get_clock_count
#include "llhttpstatuscodes.h"
#include "llbuffer.h"
@@ -1413,6 +1414,8 @@ void AICurlThread::run(void)
}
multi_handle_w->check_msg_queue();
}
// Clear the queued requests.
PerHostRequestQueue::purge();
}
AICurlMultiHandle::destroyInstance();
}
@@ -1531,27 +1534,35 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const
return ret;
}
static U32 curl_concurrent_connections = 8; // Initialized on start up by startCurlThread().
static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread().
void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{
if (mAddedEasyRequests.size() < curl_concurrent_connections) // Not throttled?
bool throttled = true; // Default.
PerHostRequestQueuePtr per_host;
{
CURLMcode ret;
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat per_host_w(*per_host);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_host_w->throttled())
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
curl_easy_request_w->set_timeout_opts();
ret = curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle);
}
if (ret == CURLM_OK)
{
std::pair<addedEasyRequests_type::iterator, bool> res = mAddedEasyRequests.insert(easy_request);
llassert(res.second); // May not have been added before.
Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
return;
if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK)
{
per_host_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests.
throttled = false; // Fall through...
}
}
} // Release the lock on easy_request.
if (!throttled)
{ // ... to here.
std::pair<addedEasyRequests_type::iterator, bool> res = mAddedEasyRequests.insert(easy_request);
llassert(res.second); // May not have been added before.
Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
return;
}
mQueuedRequests.push_back(easy_request);
// The request could not be added, we have to queue it.
PerHostRequestQueue_wat(*per_host)->queue(easy_request);
#ifdef SHOW_ASSERT
// Not active yet, but it's no longer an error if next we try to remove the request.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false;
@@ -1560,31 +1571,22 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command)
{
AICurlEasyRequest_wat easy_request_w(*easy_request);
addedEasyRequests_type::iterator iter = mAddedEasyRequests.find(easy_request);
if (iter == mAddedEasyRequests.end())
{
// The request could be queued.
std::deque<AICurlEasyRequest>::iterator const end = mQueuedRequests.end();
std::deque<AICurlEasyRequest>::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request);
if (cur != end)
{
// We can't use erase because that uses assignment to move elements, which is private because it isn't thread-safe for AICurlEasyRequest.
// Therefore, move the element that we found to the back with swap (could just swap with the end immediately,
// but I don't want to break the order in which requests where added). Swap is also not thread-safe, but OK here
// because it only touches the AICurlEasyRequest objects in the deque, and the deque is protected by the
// lock on MultiHandle.
std::deque<AICurlEasyRequest>::iterator prev = cur;
while (++cur != end)
{
prev->swap(*cur);
prev = cur;
}
#ifdef SHOW_ASSERT
// Now a second remove command would be an error again.
AICurlEasyRequest_wat(**prev)->mRemovedPerCommand = true;
bool removed =
#endif
mQueuedRequests.pop_back();
easy_request_w->removeFromPerHostQueue(easy_request);
#ifdef SHOW_ASSERT
if (removed)
{
// Now a second remove command would be an error again.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = true;
}
#endif
return (CURLMcode)-2; // Was already removed before, or never added (queued).
}
return remove_easy_request(iter, as_per_command);
@@ -1593,9 +1595,12 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request
CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command)
{
CURLMcode res;
PerHostRequestQueuePtr per_host;
{
AICurlEasyRequest_wat curl_easy_request_w(**iter);
res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat(*per_host)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
#ifdef SHOW_ASSERT
curl_easy_request_w->mRemovedPerCommand = as_per_command;
#endif
@@ -1607,12 +1612,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
Dout(dc::curl, "MultiHandle::remove_easy_request: Removed AICurlEasyRequest " << (void*)lockobj << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
// Attempt to add a queued request, if any.
if (!mQueuedRequests.empty())
{
add_easy_request(mQueuedRequests.front());
mQueuedRequests.pop_front();
}
PerHostRequestQueue_wat(*per_host)->add_queued_to(this);
return res;
}
@@ -2455,23 +2455,37 @@ void AICurlEasyRequest::removeRequest(void)
namespace AICurlInterface {
void startCurlThread(U32 CurlConcurrentConnections, bool NoVerifySSLCert)
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerHost, bool NoVerifySSLCert)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
llassert(is_main_thread());
curl_concurrent_connections = CurlConcurrentConnections; // Debug Setting.
gNoVerifySSLCert = NoVerifySSLCert; // Debug Setting.
// Cache Debug Settings.
curl_max_total_concurrent_connections = CurlMaxTotalConcurrentConnections;
curl_concurrent_connections_per_host = CurlConcurrentConnectionsPerHost;
gNoVerifySSLCert = NoVerifySSLCert;
AICurlThread::sInstance = new AICurlThread;
AICurlThread::sInstance->start();
}
bool handleCurlConcurrentConnections(LLSD const& newvalue)
bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue)
{
using namespace AICurlPrivate::curlthread;
curl_concurrent_connections = newvalue.asInteger();
llinfos << "CurlConcurrentConnections set to " << curl_concurrent_connections << llendl;
curl_max_total_concurrent_connections = newvalue.asInteger();
llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl;
return true;
}
bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue)
{
using namespace AICurlPrivate;
curl_concurrent_connections_per_host = newvalue.asInteger();
llinfos << "CurlConcurrentConnectionsPerHost set to " << curl_concurrent_connections_per_host << llendl;
return true;
}