PerHost became PerService

Reflect the fact that we include port number in its name.
This commit is contained in:
Aleric Inglewood
2013-04-09 05:06:32 +02:00
parent 29a1d0e4e1
commit fce106f7e2
12 changed files with 148 additions and 140 deletions

View File

@@ -392,7 +392,7 @@ bool LLCrashLogger::init()
// Start curl thread.
AICurlInterface::startCurlThread(64, // CurlMaxTotalConcurrentConnections
8, // CurlConcurrentConnectionsPerHost
8, // CurlConcurrentConnectionsPerService
true); // NoVerifySSLCert
// We assume that all the logs we're looking for reside on the current drive

View File

@@ -959,9 +959,9 @@ CurlEasyRequest::~CurlEasyRequest()
// be available anymore.
send_handle_events_to(NULL);
revokeCallbacks();
if (mPerHostPtr)
if (mPerServicePtr)
{
AIPerHostRequestQueue::release(mPerHostPtr);
AIPerServiceRequestQueue::release(mPerServicePtr);
}
// This wasn't freed yet if the request never finished.
curl_slist_free_all(mHeaders);
@@ -1113,8 +1113,8 @@ void CurlEasyRequest::finalizeRequest(std::string const& url, AIHTTPTimeoutPolic
#endif
setopt(CURLOPT_HTTPHEADER, mHeaders);
setoptString(CURLOPT_URL, url);
llassert(!mPerHostPtr);
mLowercaseServicename = AIPerHostRequestQueue::extract_canonical_servicename(url);
llassert(!mPerServicePtr);
mLowercaseServicename = AIPerServiceRequestQueue::extract_canonical_servicename(url);
mTimeoutPolicy = &policy;
state_machine->setTotalDelayTimeout(policy.getTotalDelay());
// The following line is a bit tricky: we store a pointer to the object without increasing its reference count.
@@ -1236,22 +1236,22 @@ void CurlEasyRequest::queued_for_removal(AICurlEasyRequest_wat& curl_easy_reques
}
#endif
AIPerHostRequestQueuePtr CurlEasyRequest::getPerHostPtr(void)
AIPerServiceRequestQueuePtr CurlEasyRequest::getPerServicePtr(void)
{
if (!mPerHostPtr)
if (!mPerServicePtr)
{
// mPerHostPtr is really just a speed-up cache.
// mPerServicePtr is really just a speed-up cache.
// The reason we can cache it is because mLowercaseServicename is only set
// in finalizeRequest which may only be called once: it never changes.
mPerHostPtr = AIPerHostRequestQueue::instance(mLowercaseServicename);
mPerServicePtr = AIPerServiceRequestQueue::instance(mLowercaseServicename);
}
return mPerHostPtr;
return mPerServicePtr;
}
bool CurlEasyRequest::removeFromPerHostQueue(AICurlEasyRequest const& easy_request) const
bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const
{
// Note that easy_request (must) represent(s) this object; it's just passed for convenience.
return mPerHostPtr && PerHostRequestQueue_wat(*mPerHostPtr)->cancel(easy_request);
return mPerServicePtr && PerServiceRequestQueue_wat(*mPerServicePtr)->cancel(easy_request);
}
std::string CurlEasyRequest::getLowercaseHostname(void) const

View File

@@ -52,7 +52,7 @@
#include "stdtypes.h" // U16, S32, U32, F64
#include "llatomic.h" // LLAtomicU32
#include "aithreadsafe.h"
#include "aicurlperservice.h" // AIPerHostRequestQueuePtr
#include "aicurlperservice.h" // AIPerServiceRequestQueuePtr
// Debug Settings.
extern bool gNoVerifySSLCert;
@@ -155,7 +155,7 @@ struct Stats {
// Called to handle changes in Debug Settings.
bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue);
bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue);
bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue);
bool handleNoVerifySSLCert(LLSD const& newvalue);
// Called once at start of application (from newview/llappviewer.cpp by main thread (before threads are created)),
@@ -163,7 +163,7 @@ bool handleNoVerifySSLCert(LLSD const& newvalue);
void initCurl(void);
// Called once at start of application (from LLAppViewer::initThreads), starts AICurlThread.
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerHost, bool NoVerifySSLCert);
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerService, bool NoVerifySSLCert);
// Called once at the end of application before terminating other threads (most notably the texture thread workers)
// with the purpose to stop the curl thread from doing any call backs to running responders: the responders sometimes

View File

@@ -1,6 +1,6 @@
/**
* @file aiperservice.cpp
* @brief Implementation of AIPerHostRequestQueue
* @brief Implementation of AIPerServiceRequestQueue
*
* Copyright (c) 2012, 2013, Aleric Inglewood.
*
@@ -30,36 +30,40 @@
* 06/04/2013
* Renamed AICurlPrivate::PerHostRequestQueue[Ptr] to AIPerHostRequestQueue[Ptr]
* to allow public access.
*
* 09/04/2013
* Renamed everything "host" to "service" and use "hostname:port" as key
* instead of just "hostname".
*/
#include "sys.h"
#include "aicurlperservice.h"
#include "aicurlthread.h"
AIPerHostRequestQueue::threadsafe_instance_map_type AIPerHostRequestQueue::sInstanceMap;
LLAtomicS32 AIPerHostRequestQueue::sTotalQueued;
bool AIPerHostRequestQueue::sQueueEmpty;
bool AIPerHostRequestQueue::sQueueFull;
bool AIPerHostRequestQueue::sRequestStarvation;
AIPerServiceRequestQueue::threadsafe_instance_map_type AIPerServiceRequestQueue::sInstanceMap;
LLAtomicS32 AIPerServiceRequestQueue::sTotalQueued;
bool AIPerServiceRequestQueue::sQueueEmpty;
bool AIPerServiceRequestQueue::sQueueFull;
bool AIPerServiceRequestQueue::sRequestStarvation;
#undef AICurlPrivate
namespace AICurlPrivate {
U32 curl_concurrent_connections_per_host;
U32 curl_concurrent_connections_per_service;
// Friend functions of RefCountedThreadSafePerHostRequestQueue
// Friend functions of RefCountedThreadSafePerServiceRequestQueue
void intrusive_ptr_add_ref(RefCountedThreadSafePerHostRequestQueue* per_host)
void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* per_service)
{
per_host->mReferenceCount++;
per_service->mReferenceCount++;
}
void intrusive_ptr_release(RefCountedThreadSafePerHostRequestQueue* per_host)
void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_service)
{
if (--per_host->mReferenceCount == 0)
if (--per_service->mReferenceCount == 0)
{
delete per_host;
delete per_service;
}
}
@@ -92,7 +96,7 @@ using namespace AICurlPrivate;
// - port does not contain a ':', and if it exists is always prepended by a ':'.
//
//static
std::string AIPerHostRequestQueue::extract_canonical_servicename(std::string const& url)
std::string AIPerServiceRequestQueue::extract_canonical_servicename(std::string const& url)
{
char const* p = url.data();
char const* const end = p + url.size();
@@ -164,21 +168,21 @@ std::string AIPerHostRequestQueue::extract_canonical_servicename(std::string con
}
//static
AIPerHostRequestQueuePtr AIPerHostRequestQueue::instance(std::string const& servicename)
AIPerServiceRequestQueuePtr AIPerServiceRequestQueue::instance(std::string const& servicename)
{
llassert(!servicename.empty());
instance_map_wat instance_map_w(sInstanceMap);
AIPerHostRequestQueue::iterator iter = instance_map_w->find(servicename);
AIPerServiceRequestQueue::iterator iter = instance_map_w->find(servicename);
if (iter == instance_map_w->end())
{
iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerHostRequestQueue)).first;
iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerServiceRequestQueue)).first;
}
// Note: the creation of AIPerHostRequestQueuePtr MUST be protected by the lock on sInstanceMap (see release()).
// Note: the creation of AIPerServiceRequestQueuePtr MUST be protected by the lock on sInstanceMap (see release()).
return iter->second;
}
//static
void AIPerHostRequestQueue::release(AIPerHostRequestQueuePtr& instance)
void AIPerServiceRequestQueue::release(AIPerServiceRequestQueuePtr& instance)
{
if (instance->exactly_two_left()) // Being 'instance' and the one in sInstanceMap.
{
@@ -196,7 +200,7 @@ void AIPerHostRequestQueue::release(AIPerHostRequestQueuePtr& instance)
return;
}
// The reference in the map is the last one; that means there can't be any curl easy requests queued for this host.
llassert(PerHostRequestQueue_rat(*instance)->mQueuedRequests.empty());
llassert(PerServiceRequestQueue_rat(*instance)->mQueuedRequests.empty());
// Find the host and erase it from the map.
iterator const end = instance_map_w->end();
for(iterator iter = instance_map_w->begin(); iter != end; ++iter)
@@ -214,31 +218,31 @@ void AIPerHostRequestQueue::release(AIPerHostRequestQueuePtr& instance)
instance.reset();
}
bool AIPerHostRequestQueue::throttled() const
bool AIPerServiceRequestQueue::throttled() const
{
llassert(mAdded <= int(curl_concurrent_connections_per_host));
return mAdded == int(curl_concurrent_connections_per_host);
llassert(mAdded <= int(curl_concurrent_connections_per_service));
return mAdded == int(curl_concurrent_connections_per_service);
}
void AIPerHostRequestQueue::added_to_multi_handle(void)
void AIPerServiceRequestQueue::added_to_multi_handle(void)
{
llassert(mAdded < int(curl_concurrent_connections_per_host));
llassert(mAdded < int(curl_concurrent_connections_per_service));
++mAdded;
}
void AIPerHostRequestQueue::removed_from_multi_handle(void)
void AIPerServiceRequestQueue::removed_from_multi_handle(void)
{
--mAdded;
llassert(mAdded >= 0);
}
void AIPerHostRequestQueue::queue(AICurlEasyRequest const& easy_request)
void AIPerServiceRequestQueue::queue(AICurlEasyRequest const& easy_request)
{
mQueuedRequests.push_back(easy_request.get_ptr());
sTotalQueued++;
}
bool AIPerHostRequestQueue::cancel(AICurlEasyRequest const& easy_request)
bool AIPerServiceRequestQueue::cancel(AICurlEasyRequest const& easy_request)
{
queued_request_type::iterator const end = mQueuedRequests.end();
queued_request_type::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request.get_ptr());
@@ -251,7 +255,7 @@ bool AIPerHostRequestQueue::cancel(AICurlEasyRequest const& easy_request)
// the back with swap (could just swap with the end immediately, but I don't
// want to break the order in which requests where added). Swap is also not
// thread-safe, but OK here because it only touches the objects in the deque,
// and the deque is protected by the lock on the AIPerHostRequestQueue object.
// and the deque is protected by the lock on the AIPerServiceRequestQueue object.
queued_request_type::iterator prev = cur;
while (++cur != end)
{
@@ -264,7 +268,7 @@ bool AIPerHostRequestQueue::cancel(AICurlEasyRequest const& easy_request)
return true;
}
void AIPerHostRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
void AIPerServiceRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
{
if (!mQueuedRequests.empty())
{
@@ -305,15 +309,15 @@ void AIPerHostRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
}
//static
void AIPerHostRequestQueue::purge(void)
void AIPerServiceRequestQueue::purge(void)
{
instance_map_wat instance_map_w(sInstanceMap);
for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host)
{
Dout(dc::curl, "Purging queue of host \"" << host->first << "\".");
PerHostRequestQueue_wat per_host_w(*host->second);
size_t s = per_host_w->mQueuedRequests.size();
per_host_w->mQueuedRequests.clear();
PerServiceRequestQueue_wat per_service_w(*host->second);
size_t s = per_service_w->mQueuedRequests.size();
per_service_w->mQueuedRequests.clear();
sTotalQueued -= s;
llassert(sTotalQueued >= 0);
}

View File

@@ -1,6 +1,6 @@
/**
* @file aicurlperservice.h
* @brief Definition of class AIPerHostRequestQueue
* @brief Definition of class AIPerServiceRequestQueue
*
* Copyright (c) 2012, 2013, Aleric Inglewood.
*
@@ -30,10 +30,14 @@
* 06/04/2013
* Renamed AIPrivate::PerHostRequestQueue[Ptr] to AIPerHostRequestQueue[Ptr]
* to allow public access.
*
* 09/04/2013
* Renamed everything "host" to "service" and use "hostname:port" as key
* instead of just "hostname".
*/
#ifndef AICURLPERHOST_H
#define AICURLPERHOST_H
#ifndef AICURLPERSERVICE_H
#define AICURLPERSERVICE_H
#include "llerror.h" // llassert
#include <string>
@@ -43,49 +47,49 @@
#include "aithreadsafe.h"
class AICurlEasyRequest;
class AIPerHostRequestQueue;
class AIPerServiceRequestQueue;
namespace AICurlPrivate {
namespace curlthread { class MultiHandle; }
class RefCountedThreadSafePerHostRequestQueue;
class RefCountedThreadSafePerServiceRequestQueue;
class ThreadSafeBufferedCurlEasyRequest;
// Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h).
typedef boost::intrusive_ptr<ThreadSafeBufferedCurlEasyRequest> BufferedCurlEasyRequestPtr;
// AIPerHostRequestQueue objects are created by the curl thread and destructed by the main thread.
// AIPerServiceRequestQueue objects are created by the curl thread and destructed by the main thread.
// We need locking.
typedef AIThreadSafeSimpleDC<AIPerHostRequestQueue> threadsafe_PerHostRequestQueue;
typedef AIAccessConst<AIPerHostRequestQueue> PerHostRequestQueue_crat;
typedef AIAccess<AIPerHostRequestQueue> PerHostRequestQueue_rat;
typedef AIAccess<AIPerHostRequestQueue> PerHostRequestQueue_wat;
typedef AIThreadSafeSimpleDC<AIPerServiceRequestQueue> threadsafe_PerServiceRequestQueue;
typedef AIAccessConst<AIPerServiceRequestQueue> PerServiceRequestQueue_crat;
typedef AIAccess<AIPerServiceRequestQueue> PerServiceRequestQueue_rat;
typedef AIAccess<AIPerServiceRequestQueue> PerServiceRequestQueue_wat;
} // namespace AICurlPrivate
// We can't put threadsafe_PerHostRequestQueue in a std::map because you can't copy a mutex.
// We can't put threadsafe_PerServiceRequestQueue in a std::map because you can't copy a mutex.
// Therefore, use an intrusive pointer for the threadsafe type.
typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerHostRequestQueue> AIPerHostRequestQueuePtr;
typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerServiceRequestQueue> AIPerServiceRequestQueuePtr;
//-----------------------------------------------------------------------------
// AIPerHostRequestQueue
// AIPerServiceRequestQueue
// This class provides a static interface to create and maintain instances
// of AIPerHostRequestQueue objects, so that at any moment there is at most
// one instance per hostname. Those instances then are used to queue curl
// of AIPerServiceRequestQueue objects, so that at any moment there is at most
// one instance per hostname:port. Those instances then are used to queue curl
// requests when the maximum number of connections for that host already
// have been reached.
class AIPerHostRequestQueue {
class AIPerServiceRequestQueue {
private:
typedef std::map<std::string, AIPerHostRequestQueuePtr> instance_map_type;
typedef std::map<std::string, AIPerServiceRequestQueuePtr> instance_map_type;
typedef AIThreadSafeSimpleDC<instance_map_type> threadsafe_instance_map_type;
typedef AIAccess<instance_map_type> instance_map_rat;
typedef AIAccess<instance_map_type> instance_map_wat;
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerHostRequestQueue instances with the hostname as key.
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerServiceRequestQueue instances with the hostname as key.
friend class AIThreadSafeSimpleDC<AIPerHostRequestQueue>; //threadsafe_PerHostRequestQueue
AIPerHostRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false) { }
friend class AIThreadSafeSimpleDC<AIPerServiceRequestQueue>; //threadsafe_PerServiceRequestQueue
AIPerServiceRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false) { }
public:
typedef instance_map_type::iterator iterator;
@@ -95,10 +99,10 @@ class AIPerHostRequestQueue {
static std::string extract_canonical_servicename(std::string const& url);
// Return (possibly create) a unique instance for the given hostname.
static AIPerHostRequestQueuePtr instance(std::string const& hostname);
static AIPerServiceRequestQueuePtr instance(std::string const& servicename);
// Release instance (object will be deleted if this was the last instance).
static void release(AIPerHostRequestQueuePtr& instance);
static void release(AIPerServiceRequestQueuePtr& instance);
// Remove everything. Called upon viewer exit.
static void purge(void);
@@ -110,7 +114,7 @@ class AIPerHostRequestQueue {
int mAdded; // Number of active easy handles with this host.
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerHostRequestQueue objects together.
static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerServiceRequestQueue objects together.
bool mQueueEmpty; // Set to true when the queue becomes precisely empty.
bool mQueueFull; // Set to true when the queue is popped and then still isn't empty;
@@ -140,30 +144,30 @@ class AIPerHostRequestQueue {
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host, bool too_much_bandwidth);
static bool wantsMoreHTTPRequestsFor(AIPerServiceRequestQueuePtr const& per_service, bool too_much_bandwidth);
private:
// Disallow copying.
AIPerHostRequestQueue(AIPerHostRequestQueue const&) { }
AIPerServiceRequestQueue(AIPerServiceRequestQueue const&) { }
};
namespace AICurlPrivate {
class RefCountedThreadSafePerHostRequestQueue : public threadsafe_PerHostRequestQueue {
class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceRequestQueue {
public:
RefCountedThreadSafePerHostRequestQueue(void) : mReferenceCount(0) { }
RefCountedThreadSafePerServiceRequestQueue(void) : mReferenceCount(0) { }
bool exactly_two_left(void) const { return mReferenceCount == 2; }
private:
// Used by AIPerHostRequestQueuePtr. Object is deleted when reference count reaches zero.
// Used by AIPerServiceRequestQueuePtr. Object is deleted when reference count reaches zero.
LLAtomicU32 mReferenceCount;
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerHostRequestQueue* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerHostRequestQueue* p);
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p);
};
extern U32 curl_concurrent_connections_per_host;
extern U32 curl_concurrent_connections_per_service;
} // namespace AICurlPrivate
#endif // AICURLPERHOST_H
#endif // AICURLPERSERVICE_H

View File

@@ -305,7 +305,7 @@ class CurlEasyRequest : public CurlEasyHandle {
AIHTTPTimeoutPolicy const* mTimeoutPolicy;
std::string mLowercaseServicename; // Lowercase hostname:port (canonicalized) extracted from the url.
AIPerHostRequestQueuePtr mPerHostPtr; // Pointer to the corresponding AIPerHostRequestQueue.
AIPerServiceRequestQueuePtr mPerServicePtr; // Pointer to the corresponding AIPerServiceRequestQueue.
LLPointer<curlthread::HTTPTimeout> mTimeout;// Timeout administration object associated with last created CurlSocketInfo.
bool mTimeoutIsOrphan; // Set to true when mTimeout is not (yet) associated with a CurlSocketInfo.
#if defined(CWDEBUG) || defined(DEBUG_CURLIO)
@@ -348,10 +348,10 @@ class CurlEasyRequest : public CurlEasyHandle {
inline ThreadSafeBufferedCurlEasyRequest* get_lockobj(void);
inline ThreadSafeBufferedCurlEasyRequest const* get_lockobj(void) const;
// PerHost API.
AIPerHostRequestQueuePtr getPerHostPtr(void); // (Optionally create and) return a pointer to the unique
// AIPerHostRequestQueue corresponding to mLowercaseServicename.
bool removeFromPerHostQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all.
// PerService API.
AIPerServiceRequestQueuePtr getPerServicePtr(void); // (Optionally create and) return a pointer to the unique
// AIPerServiceRequestQueue corresponding to mLowercaseServicename.
bool removeFromPerServiceQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all.
// Returns true if it was queued.
protected:
// Pass events to parent.

View File

@@ -205,7 +205,7 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable)
namespace AICurlPrivate {
LLAtomicS32 max_pipelined_requests(32);
LLAtomicS32 max_pipelined_requests_per_host(8);
LLAtomicS32 max_pipelined_requests_per_service(8);
enum command_st {
cmd_none,
@@ -1332,11 +1332,11 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
case cmd_boost: // FIXME: future stuff
break;
case cmd_add:
PerHostRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerHostPtr())->removed_from_command_queue();
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue();
multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()));
break;
case cmd_remove:
PerHostRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerHostPtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true);
break;
}
@@ -1578,7 +1578,7 @@ void AICurlThread::run(void)
multi_handle_w->check_msg_queue();
}
// Clear the queued requests.
AIPerHostRequestQueue::purge();
AIPerServiceRequestQueue::purge();
}
AICurlMultiHandle::destroyInstance();
}
@@ -1705,17 +1705,17 @@ static U32 curl_max_total_concurrent_connections = 32; // Initialized on st
void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{
bool throttled = true; // Default.
AIPerHostRequestQueuePtr per_host;
AIPerServiceRequestQueuePtr per_service;
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat per_host_w(*per_host);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_host_w->throttled())
per_service = curl_easy_request_w->getPerServicePtr();
PerServiceRequestQueue_wat per_service_w(*per_service);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
{
curl_easy_request_w->set_timeout_opts();
if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK)
{
per_host_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests.
per_service_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests.
throttled = false; // Fall through...
}
}
@@ -1731,7 +1731,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
return;
}
// The request could not be added, we have to queue it.
PerHostRequestQueue_wat(*per_host)->queue(easy_request);
PerServiceRequestQueue_wat(*per_service)->queue(easy_request);
#ifdef SHOW_ASSERT
// Not active yet, but it's no longer an error if next we try to remove the request.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false;
@@ -1748,7 +1748,7 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request
#ifdef SHOW_ASSERT
bool removed =
#endif
easy_request_w->removeFromPerHostQueue(easy_request);
easy_request_w->removeFromPerServiceQueue(easy_request);
#ifdef SHOW_ASSERT
if (removed)
{
@@ -1764,12 +1764,12 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request
CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command)
{
CURLMcode res;
AIPerHostRequestQueuePtr per_host;
AIPerServiceRequestQueuePtr per_service;
{
AICurlEasyRequest_wat curl_easy_request_w(**iter);
res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat(*per_host)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
per_service = curl_easy_request_w->getPerServicePtr();
PerServiceRequestQueue_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
#ifdef SHOW_ASSERT
curl_easy_request_w->mRemovedPerCommand = as_per_command;
#endif
@@ -1786,7 +1786,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
#endif
// Attempt to add a queued request, if any.
PerHostRequestQueue_wat(*per_host)->add_queued_to(this);
PerServiceRequestQueue_wat(*per_service)->add_queued_to(this);
return res;
}
@@ -2392,7 +2392,7 @@ void AICurlEasyRequest::addRequest(void)
command_queue_w->commands.push_back(Command(*this, cmd_add));
command_queue_w->size++;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerHostRequestQueue_wat(*curl_easy_request_w->getPerHostPtr())->added_to_command_queue();
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue();
curl_easy_request_w->add_queued();
}
// Something was added to the queue, wake up the thread to get it.
@@ -2456,7 +2456,7 @@ void AICurlEasyRequest::removeRequest(void)
command_queue_w->commands.push_back(Command(*this, cmd_remove));
command_queue_w->size--;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerHostRequestQueue_wat(*curl_easy_request_w->getPerHostPtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'.
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'.
// Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request.
curl_easy_request_w->remove_queued();
}
@@ -2468,7 +2468,7 @@ void AICurlEasyRequest::removeRequest(void)
namespace AICurlInterface {
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerHost, bool NoVerifySSLCert)
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerService, bool NoVerifySSLCert)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
@@ -2477,10 +2477,10 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo
// Cache Debug Settings.
curl_max_total_concurrent_connections = CurlMaxTotalConcurrentConnections;
curl_concurrent_connections_per_host = CurlConcurrentConnectionsPerHost;
curl_concurrent_connections_per_service = CurlConcurrentConnectionsPerService;
gNoVerifySSLCert = NoVerifySSLCert;
max_pipelined_requests = curl_max_total_concurrent_connections;
max_pipelined_requests_per_host = curl_concurrent_connections_per_host;
max_pipelined_requests_per_service = curl_concurrent_connections_per_service;
AICurlThread::sInstance = new AICurlThread;
AICurlThread::sInstance->start();
@@ -2498,14 +2498,14 @@ bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue)
return true;
}
bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue)
bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue)
{
using namespace AICurlPrivate;
U32 old = curl_concurrent_connections_per_host;
curl_concurrent_connections_per_host = newvalue.asInteger();
max_pipelined_requests_per_host += curl_concurrent_connections_per_host - old;
llinfos << "CurlConcurrentConnectionsPerHost set to " << curl_concurrent_connections_per_host << llendl;
U32 old = curl_concurrent_connections_per_service;
curl_concurrent_connections_per_service = newvalue.asInteger();
max_pipelined_requests_per_service += curl_concurrent_connections_per_service - old;
llinfos << "CurlConcurrentConnectionsPerService set to " << curl_concurrent_connections_per_service << llendl;
return true;
}
@@ -2525,7 +2525,7 @@ U32 getNumHTTPCommands(void)
U32 getNumHTTPQueued(void)
{
return AIPerHostRequestQueue::total_queued_size();
return AIPerServiceRequestQueue::total_queued_size();
}
U32 getNumHTTPAdded(void)
@@ -2551,16 +2551,16 @@ U32 getNumHTTPAdded(void)
// causes it to go through the states bs_reset, bs_initialize and then bs_multiplex with
// run state AICurlEasyRequestStateMachine_addRequest. Finally, in this state, multiplex
// calls AICurlEasyRequestStateMachine::multiplex_impl which then calls AICurlEasyRequest::addRequest
// which causes an increment of command_queue_w->size and AIPerHostRequestQueue::mQueuedCommands.
// which causes an increment of command_queue_w->size and AIPerServiceRequestQueue::mQueuedCommands.
//
// It is therefore guaranteed that in one loop of LLTextureFetchWorker::doWork,
// this size is incremented; stopping this function from returning true once we reached the
// threshold of "pipelines" requests (the sum of requests in the command queue, the ones
// throttled and queued in AIPerHostRequestQueue::mQueuedRequests and the already
// throttled and queued in AIPerServiceRequestQueue::mQueuedRequests and the already
// running requests (in MultiHandle::mAddedEasyRequests)).
//
//static
bool AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr const& per_host, bool too_much_bandwidth)
bool AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(AIPerServiceRequestQueuePtr const& per_service, bool too_much_bandwidth)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
@@ -2587,30 +2587,30 @@ bool AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr co
// Check if it's ok to get a new request for this particular host and update the per-host threshold.
// Atomic read max_pipelined_requests_per_host for the below calculations.
S32 const max_pipelined_requests_per_host_cache = max_pipelined_requests_per_host;
// Atomic read max_pipelined_requests_per_service for the below calculations.
S32 const max_pipelined_requests_per_service_cache = max_pipelined_requests_per_service;
{
PerHostRequestQueue_rat per_host_r(*per_host);
S32 const pipelined_requests_per_host = per_host_r->pipelined_requests();
reject = pipelined_requests_per_host >= max_pipelined_requests_per_host_cache;
equal = pipelined_requests_per_host == max_pipelined_requests_per_host_cache;
increment_threshold = per_host_r->mRequestStarvation;
decrement_threshold = per_host_r->mQueueFull && !per_host_r->mQueueEmpty;
PerServiceRequestQueue_rat per_service_r(*per_service);
S32 const pipelined_requests_per_service = per_service_r->pipelined_requests();
reject = pipelined_requests_per_service >= max_pipelined_requests_per_service_cache;
equal = pipelined_requests_per_service == max_pipelined_requests_per_service_cache;
increment_threshold = per_service_r->mRequestStarvation;
decrement_threshold = per_service_r->mQueueFull && !per_service_r->mQueueEmpty;
// Reset flags.
per_host_r->mQueueFull = per_host_r->mQueueEmpty = per_host_r->mRequestStarvation = false;
per_service_r->mQueueFull = per_service_r->mQueueEmpty = per_service_r->mRequestStarvation = false;
}
if (decrement_threshold)
{
if (max_pipelined_requests_per_host_cache > curl_concurrent_connections_per_host)
if (max_pipelined_requests_per_service_cache > curl_concurrent_connections_per_service)
{
--max_pipelined_requests_per_host;
--max_pipelined_requests_per_service;
}
}
else if (increment_threshold && reject)
{
if (max_pipelined_requests_per_host_cache < 2 * curl_concurrent_connections_per_host)
if (max_pipelined_requests_per_service_cache < 2 * curl_concurrent_connections_per_service)
{
max_pipelined_requests_per_host++;
max_pipelined_requests_per_service++;
// Immediately take the new threshold into account.
reject = !equal;
}
@@ -2640,7 +2640,7 @@ bool AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(AIPerHostRequestQueuePtr co
// here instead.
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
// that may exist in the pipeline minus the number of requests queued in AIPerHostRequestQueue objects, minus
// that may exist in the pipeline minus the number of requests queued in AIPerServiceRequestQueue objects, minus
// the number of already running requests.
reject = pipelined_requests >= max_pipelined_requests_cache;
equal = pipelined_requests == max_pipelined_requests_cache;

View File

@@ -81,7 +81,7 @@ class MultiHandle : public CurlMultiHandle
// Store result and trigger events for easy request.
void finish_easy_request(AICurlEasyRequest const& easy_request, CURLcode result);
// Remove easy request at iter (must exist).
// Note that it's possible that a new request from a AIPerHostRequestQueue::mQueuedRequests is inserted before iter.
// Note that it's possible that a new request from a AIPerServiceRequestQueue::mQueuedRequests is inserted before iter.
CURLMcode remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command);
static int socket_callback(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp);

View File

@@ -4448,10 +4448,10 @@ This should be as low as possible, but too low may break functionality</string>
<key>Value</key>
<integer>64</integer>
</map>
<key>CurlConcurrentConnectionsPerHost</key>
<key>CurlConcurrentConnectionsPerService</key>
<map>
<key>Comment</key>
<string>Maximum number of simultaneous curl connections per host</string>
<string>Maximum number of simultaneous curl connections per host:port service</string>
<key>Persist</key>
<integer>0</integer>
<key>Type</key>

View File

@@ -1911,7 +1911,7 @@ bool LLAppViewer::initThreads()
startEngineThread();
AICurlInterface::startCurlThread(gSavedSettings.getU32("CurlMaxTotalConcurrentConnections"),
gSavedSettings.getU32("CurlConcurrentConnectionsPerHost"),
gSavedSettings.getU32("CurlConcurrentConnectionsPerService"),
gSavedSettings.getBOOL("NoVerifySSLCert"));
LLImage::initClass();

View File

@@ -253,7 +253,7 @@ private:
LLUUID mID;
LLHost mHost;
std::string mUrl;
AIPerHostRequestQueuePtr mPerHostPtr; // Pointer to the AIPerHostRequestQueue corresponding to the host of mUrl.
AIPerServiceRequestQueuePtr mPerServicePtr; // Pointer to the AIPerServiceRequestQueue corresponding to the host of mUrl.
U8 mType;
F32 mImagePriority;
U32 mWorkPriority;
@@ -799,11 +799,11 @@ LLTextureFetchWorker::LLTextureFetchWorker(LLTextureFetch* fetcher,
if (!mCanUseNET)
{
// Probably a file://, but well; in that case servicename will be empty.
std::string servicename = AIPerHostRequestQueue::extract_canonical_servicename(mUrl);
std::string servicename = AIPerServiceRequestQueue::extract_canonical_servicename(mUrl);
if (!servicename.empty())
{
// Make sure mPerHostPtr is up to date with mUrl.
mPerHostPtr = AIPerHostRequestQueue::instance(servicename);
// Make sure mPerServicePtr is up to date with mUrl.
mPerServicePtr = AIPerServiceRequestQueue::instance(servicename);
}
}
@@ -1162,7 +1162,7 @@ bool LLTextureFetchWorker::doWork(S32 param)
{
mUrl = http_url + "/?texture_id=" + mID.asString().c_str();
mWriteToCacheState = CAN_WRITE ; //because this texture has a fixed texture id.
mPerHostPtr = AIPerHostRequestQueue::instance(AIPerHostRequestQueue::extract_canonical_servicename(http_url));
mPerServicePtr = AIPerServiceRequestQueue::instance(AIPerServiceRequestQueue::extract_canonical_servicename(http_url));
}
else
{
@@ -1272,7 +1272,7 @@ bool LLTextureFetchWorker::doWork(S32 param)
// Let AICurl decide if we can process more HTTP requests at the moment or not.
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
if (!AIPerHostRequestQueue::wantsMoreHTTPRequestsFor(mPerHostPtr, mFetcher->getTextureBandwidth() > throttle_bandwidth))
if (!AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(mPerServicePtr, mFetcher->getTextureBandwidth() > throttle_bandwidth))
{
return false ; //wait.
}

View File

@@ -792,7 +792,7 @@ void settings_setup_listeners()
gSavedSettings.getControl("AscentAvatarZModifier")->getSignal()->connect(boost::bind(&handleAscentAvatarModifier, _2));
gSavedSettings.getControl("CurlMaxTotalConcurrentConnections")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlMaxTotalConcurrentConnections, _2));
gSavedSettings.getControl("CurlConcurrentConnectionsPerHost")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlConcurrentConnectionsPerHost, _2));
gSavedSettings.getControl("CurlConcurrentConnectionsPerService")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlConcurrentConnectionsPerService, _2));
gSavedSettings.getControl("NoVerifySSLCert")->getSignal()->connect(boost::bind(&AICurlInterface::handleNoVerifySSLCert, _2));
gSavedSettings.getControl("CurlTimeoutDNSLookup")->getValidateSignal()->connect(boost::bind(&validateCurlTimeoutDNSLookup, _2));