Merge remote-tracking branch 'aleric/master'

Conflicts:
	indra/newview/lltexturefetch.cpp
	indra/newview/llviewerwindow.cpp
This commit is contained in:
Latif Khalifa
2013-04-23 12:13:54 +02:00
22 changed files with 857 additions and 543 deletions

View File

@@ -251,7 +251,7 @@ U64 totalTime()
}
else
{
if (current_clock_count >= gLastTotalTimeClockCount)
if (LL_LIKELY(current_clock_count >= gLastTotalTimeClockCount))
{
// No wrapping, we're all okay.
gTotalTimeClockCount += current_clock_count - gLastTotalTimeClockCount;

View File

@@ -392,7 +392,7 @@ bool LLCrashLogger::init()
// Start curl thread.
AICurlInterface::startCurlThread(64, // CurlMaxTotalConcurrentConnections
8, // CurlConcurrentConnectionsPerHost
8, // CurlConcurrentConnectionsPerService
true); // NoVerifySSLCert
// We assume that all the logs we're looking for reside on the current drive

View File

@@ -24,6 +24,7 @@
* $/LicenseInfo$
*/
#include "sys.h"
#include "llmath.h"
static LL_ALIGN_16(const F32 M_IDENT_3A[12]) =

View File

@@ -24,7 +24,7 @@ include_directories(
set(llmessage_SOURCE_FILES
aicurl.cpp
aicurleasyrequeststatemachine.cpp
aicurlperhost.cpp
aicurlperservice.cpp
aicurlthread.cpp
aihttpheaders.cpp
aihttptimeout.cpp
@@ -111,7 +111,7 @@ set(llmessage_HEADER_FILES
aicurl.h
aicurleasyrequeststatemachine.h
aicurlperhost.h
aicurlperservice.h
aicurlprivate.h
aicurlthread.h
aihttpheaders.h

View File

@@ -58,7 +58,7 @@
#include "aihttpheaders.h"
#include "aihttptimeoutpolicy.h"
#include "aicurleasyrequeststatemachine.h"
#include "aicurlperhost.h"
#include "aicurlperservice.h"
//==================================================================================
// Debug Settings
@@ -298,6 +298,7 @@ LLAtomicU32 Stats::easy_init_errors;
LLAtomicU32 Stats::easy_cleanup_calls;
LLAtomicU32 Stats::multi_calls;
LLAtomicU32 Stats::multi_errors;
LLAtomicU32 Stats::running_handles;
LLAtomicU32 Stats::AICurlEasyRequest_count;
LLAtomicU32 Stats::AICurlEasyRequestStateMachine_count;
LLAtomicU32 Stats::BufferedCurlEasyRequest_count;
@@ -460,6 +461,12 @@ void setCAPath(std::string const& path)
CertificateAuthority_w->path = path;
}
// THREAD-SAFE
U32 getNumHTTPRunning(void)
{
return Stats::running_handles;
}
//static
void Stats::print(void)
{
@@ -952,9 +959,9 @@ CurlEasyRequest::~CurlEasyRequest()
// be available anymore.
send_handle_events_to(NULL);
revokeCallbacks();
if (mPerHostPtr)
if (mPerServicePtr)
{
PerHostRequestQueue::release(mPerHostPtr);
AIPerServiceRequestQueue::release(mPerServicePtr);
}
// This wasn't freed yet if the request never finished.
curl_slist_free_all(mHeaders);
@@ -1084,56 +1091,6 @@ void CurlEasyRequest::applyDefaultOptions(void)
);
}
// url must be of the form
// (see http://www.ietf.org/rfc/rfc3986.txt Appendix A for definitions not given here):
//
// url = sheme ":" hier-part [ "?" query ] [ "#" fragment ]
// hier-part = "//" authority path-abempty
// authority = [ userinfo "@" ] host [ ":" port ]
// path-abempty = *( "/" segment )
//
// That is, a hier-part of the form '/ path-absolute', '/ path-rootless' or
// '/ path-empty' is NOT allowed here. This should be safe because we only
// call this function for curl access, any file access would use APR.
//
// However, as a special exception, this function allows:
//
// url = authority path-abempty
//
// without the 'sheme ":" "//"' parts.
//
// As follows from the ABNF (see RFC, Appendix A):
// - authority is either terminated by a '/' or by the end of the string because
// neither userinfo, host nor port may contain a '/'.
// - userinfo does not contain a '@', and if it exists, is always terminated by a '@'.
// - port does not contain a ':', and if it exists is always prepended by a ':'.
//
// Only called by CurlEasyRequest::finalizeRequest.
static std::string extract_canonical_hostname(std::string const& url)
{
std::string::size_type pos;
std::string::size_type authority = 0; // Default if there is no sheme.
if ((pos = url.find("://")) != url.npos && pos < url.find('/')) authority = pos + 3; // Skip the "sheme://" if any, the second find is to avoid finding a "://" as part of path-abempty.
std::string::size_type host = authority; // Default if there is no userinfo.
if ((pos = url.find('@', authority)) != url.npos) host = pos + 1; // Skip the "userinfo@" if any.
authority = url.length() - 1; // Default last character of host if there is no path-abempty.
if ((pos = url.find('/', host)) != url.npos) authority = pos - 1; // Point to last character of host.
std::string::size_type len = url.find_last_not_of(":0123456789", authority) - host + 1; // Skip trailing ":port", if any.
std::string hostname(url, host, len);
#if APR_CHARSET_EBCDIC
#error Not implemented
#else
// Convert hostname to lowercase in a way that we compare two hostnames equal iff libcurl does.
for (std::string::iterator iter = hostname.begin(); iter != hostname.end(); ++iter)
{
int c = *iter;
if (c >= 'A' && c <= 'Z')
*iter = c + ('a' - 'A');
}
#endif
return hostname;
}
void CurlEasyRequest::finalizeRequest(std::string const& url, AIHTTPTimeoutPolicy const& policy, AICurlEasyRequestStateMachine* state_machine)
{
DoutCurlEntering("CurlEasyRequest::finalizeRequest(\"" << url << "\", " << policy.name() << ", " << (void*)state_machine << ")");
@@ -1156,8 +1113,8 @@ void CurlEasyRequest::finalizeRequest(std::string const& url, AIHTTPTimeoutPolic
#endif
setopt(CURLOPT_HTTPHEADER, mHeaders);
setoptString(CURLOPT_URL, url);
llassert(!mPerHostPtr);
mLowercaseHostname = extract_canonical_hostname(url);
llassert(!mPerServicePtr);
mLowercaseServicename = AIPerServiceRequestQueue::extract_canonical_servicename(url);
mTimeoutPolicy = &policy;
state_machine->setTotalDelayTimeout(policy.getTotalDelay());
// The following line is a bit tricky: we store a pointer to the object without increasing its reference count.
@@ -1183,7 +1140,7 @@ void CurlEasyRequest::finalizeRequest(std::string const& url, AIHTTPTimeoutPolic
// // get less connect time, while it still (also) has to wait for this DNS lookup.
void CurlEasyRequest::set_timeout_opts(void)
{
setopt(CURLOPT_CONNECTTIMEOUT, mTimeoutPolicy->getConnectTimeout(mLowercaseHostname));
setopt(CURLOPT_CONNECTTIMEOUT, mTimeoutPolicy->getConnectTimeout(getLowercaseHostname()));
setopt(CURLOPT_TIMEOUT, mTimeoutPolicy->getCurlTransaction());
}
@@ -1279,22 +1236,27 @@ void CurlEasyRequest::queued_for_removal(AICurlEasyRequest_wat& curl_easy_reques
}
#endif
PerHostRequestQueuePtr CurlEasyRequest::getPerHostPtr(void)
AIPerServiceRequestQueuePtr CurlEasyRequest::getPerServicePtr(void)
{
if (!mPerHostPtr)
if (!mPerServicePtr)
{
// mPerHostPtr is really just a speed-up cache.
// The reason we can cache it is because mLowercaseHostname is only set
// mPerServicePtr is really just a speed-up cache.
// The reason we can cache it is because mLowercaseServicename is only set
// in finalizeRequest which may only be called once: it never changes.
mPerHostPtr = PerHostRequestQueue::instance(mLowercaseHostname);
mPerServicePtr = AIPerServiceRequestQueue::instance(mLowercaseServicename);
}
return mPerHostPtr;
return mPerServicePtr;
}
bool CurlEasyRequest::removeFromPerHostQueue(AICurlEasyRequest const& easy_request) const
bool CurlEasyRequest::removeFromPerServiceQueue(AICurlEasyRequest const& easy_request) const
{
// Note that easy_request (must) represent(s) this object; it's just passed for convenience.
return mPerHostPtr && PerHostRequestQueue_wat(*mPerHostPtr)->cancel(easy_request);
return mPerServicePtr && PerServiceRequestQueue_wat(*mPerServicePtr)->cancel(easy_request);
}
std::string CurlEasyRequest::getLowercaseHostname(void) const
{
return mLowercaseServicename.substr(0, mLowercaseServicename.find_last_of(':'));
}
//-----------------------------------------------------------------------------

View File

@@ -52,6 +52,7 @@
#include "stdtypes.h" // U16, S32, U32, F64
#include "llatomic.h" // LLAtomicU32
#include "aithreadsafe.h"
#include "aicurlperservice.h" // AIPerServiceRequestQueuePtr
// Debug Settings.
extern bool gNoVerifySSLCert;
@@ -133,6 +134,7 @@ struct Stats {
static LLAtomicU32 easy_cleanup_calls;
static LLAtomicU32 multi_calls;
static LLAtomicU32 multi_errors;
static LLAtomicU32 running_handles;
static LLAtomicU32 AICurlEasyRequest_count;
static LLAtomicU32 AICurlEasyRequestStateMachine_count;
static LLAtomicU32 BufferedCurlEasyRequest_count;
@@ -153,7 +155,7 @@ struct Stats {
// Called to handle changes in Debug Settings.
bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue);
bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue);
bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue);
bool handleNoVerifySSLCert(LLSD const& newvalue);
// Called once at start of application (from newview/llappviewer.cpp by main thread (before threads are created)),
@@ -161,7 +163,7 @@ bool handleNoVerifySSLCert(LLSD const& newvalue);
void initCurl(void);
// Called once at start of application (from LLAppViewer::initThreads), starts AICurlThread.
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerHost, bool NoVerifySSLCert);
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerService, bool NoVerifySSLCert);
// Called once at the end of application before terminating other threads (most notably the texture thread workers)
// with the purpose to stop the curl thread from doing any call backs to running responders: the responders sometimes
@@ -185,6 +187,19 @@ void setCAFile(std::string const& file);
// Can be used to set the path to the Certificate Authority file.
void setCAPath(std::string const& file);
// Returns number of queued 'add' commands minus the number of queued 'remove' commands.
U32 getNumHTTPCommands(void);
// Returns the number of queued requests.
U32 getNumHTTPQueued(void);
// Returns the number of curl requests currently added to the multi handle.
U32 getNumHTTPAdded(void);
// This used to be LLAppViewer::getTextureFetch()->getNumHTTPRequests().
// Returns the number of active curl easy handles (that are actually attempting to download something).
U32 getNumHTTPRunning(void);
} // namespace AICurlInterface
// Forward declaration (see aicurlprivate.h).

View File

@@ -1,175 +0,0 @@
/**
* @file aiperhost.cpp
* @brief Implementation of PerHostRequestQueue
*
* Copyright (c) 2012, Aleric Inglewood.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* There are special exceptions to the terms and conditions of the GPL as
* it is applied to this Source Code. View the full text of the exception
* in the file doc/FLOSS-exception.txt in this software distribution.
*
* CHANGELOG
* and additional copyright holders.
*
* 04/11/2012
* Initial version, written by Aleric Inglewood @ SL
*/
#include "sys.h"
#include "aicurlperhost.h"
#include "aicurlthread.h"
#undef AICurlPrivate
namespace AICurlPrivate {
PerHostRequestQueue::threadsafe_instance_map_type PerHostRequestQueue::sInstanceMap;
U32 curl_concurrent_connections_per_host;
//static
PerHostRequestQueuePtr PerHostRequestQueue::instance(std::string const& hostname)
{
llassert(!hostname.empty());
instance_map_wat instance_map_w(sInstanceMap);
PerHostRequestQueue::iterator iter = instance_map_w->find(hostname);
if (iter == instance_map_w->end())
{
iter = instance_map_w->insert(instance_map_type::value_type(hostname, new RefCountedThreadSafePerHostRequestQueue)).first;
}
// Note: the creation of PerHostRequestQueuePtr MUST be protected by the lock on sInstanceMap (see release()).
return iter->second;
}
//static
void PerHostRequestQueue::release(PerHostRequestQueuePtr& instance)
{
if (instance->exactly_two_left()) // Being 'instance' and the one in sInstanceMap.
{
// The viewer can be have left main() we can't access the global sInstanceMap anymore.
if (LLApp::isStopped())
{
return;
}
instance_map_wat instance_map_w(sInstanceMap);
// It is possible that 'exactly_two_left' is not up to date anymore.
// Therefore, recheck the condition now that we have locked sInstanceMap.
if (!instance->exactly_two_left())
{
// Some other thread added this host in the meantime.
return;
}
// The reference in the map is the last one; that means there can't be any curl easy requests queued for this host.
llassert(PerHostRequestQueue_wat(*instance)->mQueuedRequests.empty());
// Find the host and erase it from the map.
iterator const end = instance_map_w->end();
for(iterator iter = instance_map_w->begin(); iter != end; ++iter)
{
if (instance == iter->second)
{
instance_map_w->erase(iter);
instance.reset();
return;
}
}
// We should always find the host.
llassert(false);
}
instance.reset();
}
bool PerHostRequestQueue::throttled() const
{
llassert(mAdded <= int(curl_concurrent_connections_per_host));
return mAdded == int(curl_concurrent_connections_per_host);
}
void PerHostRequestQueue::added_to_multi_handle(void)
{
llassert(mAdded < int(curl_concurrent_connections_per_host));
++mAdded;
}
void PerHostRequestQueue::removed_from_multi_handle(void)
{
--mAdded;
llassert(mAdded >= 0);
}
void PerHostRequestQueue::queue(AICurlEasyRequest const& easy_request)
{
mQueuedRequests.push_back(easy_request.get_ptr());
}
bool PerHostRequestQueue::cancel(AICurlEasyRequest const& easy_request)
{
queued_request_type::iterator const end = mQueuedRequests.end();
queued_request_type::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request.get_ptr());
if (cur == end)
return false; // Not found.
// We can't use erase because that uses assignment to move elements,
// because it isn't thread-safe. Therefore, move the element that we found to
// the back with swap (could just swap with the end immediately, but I don't
// want to break the order in which requests where added). Swap is also not
// thread-safe, but OK here because it only touches the objects in the deque,
// and the deque is protected by the lock on the PerHostRequestQueue object.
queued_request_type::iterator prev = cur;
while (++cur != end)
{
prev->swap(*cur); // This is safe,
prev = cur;
}
mQueuedRequests.pop_back(); // if this is safe.
return true;
}
void PerHostRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
{
if (!mQueuedRequests.empty())
{
multi_handle->add_easy_request(mQueuedRequests.front());
mQueuedRequests.pop_front();
}
}
//static
void PerHostRequestQueue::purge(void)
{
instance_map_wat instance_map_w(sInstanceMap);
for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host)
{
Dout(dc::curl, "Purging queue of host \"" << host->first << "\".");
PerHostRequestQueue_wat(*host->second)->mQueuedRequests.clear();
}
}
// Friend functions of RefCountedThreadSafePerHostRequestQueue
void intrusive_ptr_add_ref(RefCountedThreadSafePerHostRequestQueue* per_host)
{
per_host->mReferenceCount++;
}
void intrusive_ptr_release(RefCountedThreadSafePerHostRequestQueue* per_host)
{
if (--per_host->mReferenceCount == 0)
{
delete per_host;
}
}
} // namespace AICurlPrivate

View File

@@ -1,135 +0,0 @@
/**
* @file aicurlperhost.h
* @brief Definition of class PerHostRequestQueue
*
* Copyright (c) 2012, Aleric Inglewood.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* There are special exceptions to the terms and conditions of the GPL as
* it is applied to this Source Code. View the full text of the exception
* in the file doc/FLOSS-exception.txt in this software distribution.
*
* CHANGELOG
* and additional copyright holders.
*
* 04/11/2012
* Initial version, written by Aleric Inglewood @ SL
*/
#ifndef AICURLPERHOST_H
#define AICURLPERHOST_H
#include "llerror.h" // llassert
#include <string>
#include <deque>
#include <map>
#include <boost/intrusive_ptr.hpp>
#include "aithreadsafe.h"
class AICurlEasyRequest;
namespace AICurlPrivate {
namespace curlthread { class MultiHandle; }
class PerHostRequestQueue;
class RefCountedThreadSafePerHostRequestQueue;
class ThreadSafeBufferedCurlEasyRequest;
// Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h).
typedef boost::intrusive_ptr<ThreadSafeBufferedCurlEasyRequest> BufferedCurlEasyRequestPtr;
// PerHostRequestQueue objects are created by the curl thread and destructed by the main thread.
// We need locking.
typedef AIThreadSafeSimpleDC<PerHostRequestQueue> threadsafe_PerHostRequestQueue;
typedef AIAccessConst<PerHostRequestQueue> PerHostRequestQueue_crat;
typedef AIAccess<PerHostRequestQueue> PerHostRequestQueue_rat;
typedef AIAccess<PerHostRequestQueue> PerHostRequestQueue_wat;
// We can't put threadsafe_PerHostRequestQueue in a std::map because you can't copy a mutex.
// Therefore, use an intrusive pointer for the threadsafe type.
typedef boost::intrusive_ptr<RefCountedThreadSafePerHostRequestQueue> PerHostRequestQueuePtr;
//-----------------------------------------------------------------------------
// PerHostRequestQueue
// This class provides a static interface to create and maintain instances
// of PerHostRequestQueue objects, so that at any moment there is at most
// one instance per hostname. Those instances then are used to queue curl
// requests when the maximum number of connections for that host already
// have been reached.
class PerHostRequestQueue {
private:
typedef std::map<std::string, PerHostRequestQueuePtr> instance_map_type;
typedef AIThreadSafeSimpleDC<instance_map_type> threadsafe_instance_map_type;
typedef AIAccess<instance_map_type> instance_map_rat;
typedef AIAccess<instance_map_type> instance_map_wat;
static threadsafe_instance_map_type sInstanceMap; // Map of PerHostRequestQueue instances with the hostname as key.
friend class AIThreadSafeSimpleDC<PerHostRequestQueue>; //threadsafe_PerHostRequestQueue
PerHostRequestQueue(void) : mAdded(0) { }
public:
typedef instance_map_type::iterator iterator;
typedef instance_map_type::const_iterator const_iterator;
// Return (possibly create) a unique instance for the given hostname.
static PerHostRequestQueuePtr instance(std::string const& hostname);
// Release instance (object will be deleted if this was the last instance).
static void release(PerHostRequestQueuePtr& instance);
// Remove everything. Called upon viewer exit.
static void purge(void);
private:
typedef std::deque<BufferedCurlEasyRequestPtr> queued_request_type;
int mAdded; // Number of active easy handles with this host.
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
public:
void added_to_multi_handle(void); // Called when an easy handle for this host has been added to the multi handle.
void removed_from_multi_handle(void); // Called when an easy handle for this host is removed again from the multi handle.
bool throttled(void) const; // Returns true if the maximum number of allowed requests for this host have been added to the multi handle.
void queue(AICurlEasyRequest const& easy_request); // Add easy_request to the queue.
bool cancel(AICurlEasyRequest const& easy_request); // Remove easy_request from the queue (if it's there).
void add_queued_to(curlthread::MultiHandle* mh); // Add queued easy handle (if any) to the multi handle. The request is removed from the queue,
// followed by either a call to added_to_multi_handle() or to queue() to add it back.
private:
// Disallow copying.
PerHostRequestQueue(PerHostRequestQueue const&) { }
};
class RefCountedThreadSafePerHostRequestQueue : public threadsafe_PerHostRequestQueue {
public:
RefCountedThreadSafePerHostRequestQueue(void) : mReferenceCount(0) { }
bool exactly_two_left(void) const { return mReferenceCount == 2; }
private:
// Used by PerHostRequestQueuePtr. Object is deleted when reference count reaches zero.
LLAtomicU32 mReferenceCount;
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerHostRequestQueue* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerHostRequestQueue* p);
};
extern U32 curl_concurrent_connections_per_host;
} // namespace AICurlPrivate
#endif // AICURLPERHOST_H

View File

@@ -0,0 +1,325 @@
/**
* @file aiperservice.cpp
* @brief Implementation of AIPerServiceRequestQueue
*
* Copyright (c) 2012, 2013, Aleric Inglewood.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* There are special exceptions to the terms and conditions of the GPL as
* it is applied to this Source Code. View the full text of the exception
* in the file doc/FLOSS-exception.txt in this software distribution.
*
* CHANGELOG
* and additional copyright holders.
*
* 04/11/2012
* Initial version, written by Aleric Inglewood @ SL
*
* 06/04/2013
* Renamed AICurlPrivate::PerHostRequestQueue[Ptr] to AIPerHostRequestQueue[Ptr]
* to allow public access.
*
* 09/04/2013
* Renamed everything "host" to "service" and use "hostname:port" as key
* instead of just "hostname".
*/
#include "sys.h"
#include "aicurlperservice.h"
#include "aicurlthread.h"
AIPerServiceRequestQueue::threadsafe_instance_map_type AIPerServiceRequestQueue::sInstanceMap;
LLAtomicS32 AIPerServiceRequestQueue::sTotalQueued;
bool AIPerServiceRequestQueue::sQueueEmpty;
bool AIPerServiceRequestQueue::sQueueFull;
bool AIPerServiceRequestQueue::sRequestStarvation;
#undef AICurlPrivate
namespace AICurlPrivate {
U32 curl_concurrent_connections_per_service;
// Friend functions of RefCountedThreadSafePerServiceRequestQueue
void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* per_service)
{
per_service->mReferenceCount++;
}
void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* per_service)
{
if (--per_service->mReferenceCount == 0)
{
delete per_service;
}
}
} // namespace AICurlPrivate
using namespace AICurlPrivate;
// url must be of the form
// (see http://www.ietf.org/rfc/rfc3986.txt Appendix A for definitions not given here):
//
// url = sheme ":" hier-part [ "?" query ] [ "#" fragment ]
// hier-part = "//" authority path-abempty
// authority = [ userinfo "@" ] host [ ":" port ]
// path-abempty = *( "/" segment )
//
// That is, a hier-part of the form '/ path-absolute', '/ path-rootless' or
// '/ path-empty' is NOT allowed here. This should be safe because we only
// call this function for curl access, any file access would use APR.
//
// However, as a special exception, this function allows:
//
// url = authority path-abempty
//
// without the 'sheme ":" "//"' parts.
//
// As follows from the ABNF (see RFC, Appendix A):
// - authority is either terminated by a '/' or by the end of the string because
// neither userinfo, host nor port may contain a '/'.
// - userinfo does not contain a '@', and if it exists, is always terminated by a '@'.
// - port does not contain a ':', and if it exists is always prepended by a ':'.
//
//static
std::string AIPerServiceRequestQueue::extract_canonical_servicename(std::string const& url)
{
char const* p = url.data();
char const* const end = p + url.size();
char const* sheme_colon = NULL;
char const* sheme_slash = NULL;
char const* first_ampersand = NULL;
char const* port_colon = NULL;
std::string servicename;
char const* hostname = p; // Default in the case there is no "sheme://userinfo@".
while (p < end)
{
int c = *p;
if (c == ':')
{
if (!port_colon && std::isdigit(p[1]))
{
port_colon = p;
}
else if (!sheme_colon && !sheme_slash && !first_ampersand && !port_colon)
{
// Found a colon before any slash or ampersand: this has to be the colon between the sheme and the hier-part.
sheme_colon = p;
}
}
else if (c == '/')
{
if (!sheme_slash && sheme_colon && sheme_colon == p - 1 && !first_ampersand && p[1] == '/')
{
// Found the first '/' in the first occurance of the sequence "://".
sheme_slash = p;
hostname = ++p + 1; // Point hostname to the start of the authority, the default when there is no "userinfo@" part.
servicename.clear(); // Remove the sheme.
}
else
{
// Found slash that is not part of the "sheme://" string. Signals end of authority.
// We're done.
break;
}
}
else if (c == '@')
{
if (!first_ampersand)
{
first_ampersand = p;
hostname = p + 1;
servicename.clear(); // Remove the "userinfo@"
}
}
if (p >= hostname)
{
// Convert hostname to lowercase in a way that we compare two hostnames equal iff libcurl does.
#if APR_CHARSET_EBCDIC
#error Not implemented
#else
if (c >= 'A' && c <= 'Z')
c += ('a' - 'A');
#endif
servicename += c;
}
++p;
}
// Strip of any trailing ":80".
if (p - 3 == port_colon && p[-1] == '0' && p[-2] == '8')
{
return servicename.substr(0, p - hostname - 3);
}
return servicename;
}
//static
AIPerServiceRequestQueuePtr AIPerServiceRequestQueue::instance(std::string const& servicename)
{
llassert(!servicename.empty());
instance_map_wat instance_map_w(sInstanceMap);
AIPerServiceRequestQueue::iterator iter = instance_map_w->find(servicename);
if (iter == instance_map_w->end())
{
iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerServiceRequestQueue)).first;
}
// Note: the creation of AIPerServiceRequestQueuePtr MUST be protected by the lock on sInstanceMap (see release()).
return iter->second;
}
//static
void AIPerServiceRequestQueue::release(AIPerServiceRequestQueuePtr& instance)
{
if (instance->exactly_two_left()) // Being 'instance' and the one in sInstanceMap.
{
// The viewer can be have left main() we can't access the global sInstanceMap anymore.
if (LLApp::isStopped())
{
return;
}
instance_map_wat instance_map_w(sInstanceMap);
// It is possible that 'exactly_two_left' is not up to date anymore.
// Therefore, recheck the condition now that we have locked sInstanceMap.
if (!instance->exactly_two_left())
{
// Some other thread added this host in the meantime.
return;
}
// The reference in the map is the last one; that means there can't be any curl easy requests queued for this host.
llassert(PerServiceRequestQueue_rat(*instance)->mQueuedRequests.empty());
// Find the host and erase it from the map.
iterator const end = instance_map_w->end();
for(iterator iter = instance_map_w->begin(); iter != end; ++iter)
{
if (instance == iter->second)
{
instance_map_w->erase(iter);
instance.reset();
return;
}
}
// We should always find the host.
llassert(false);
}
instance.reset();
}
bool AIPerServiceRequestQueue::throttled() const
{
llassert(mAdded <= int(curl_concurrent_connections_per_service));
return mAdded == int(curl_concurrent_connections_per_service);
}
void AIPerServiceRequestQueue::added_to_multi_handle(void)
{
llassert(mAdded < int(curl_concurrent_connections_per_service));
++mAdded;
}
void AIPerServiceRequestQueue::removed_from_multi_handle(void)
{
--mAdded;
llassert(mAdded >= 0);
}
void AIPerServiceRequestQueue::queue(AICurlEasyRequest const& easy_request)
{
mQueuedRequests.push_back(easy_request.get_ptr());
sTotalQueued++;
}
bool AIPerServiceRequestQueue::cancel(AICurlEasyRequest const& easy_request)
{
queued_request_type::iterator const end = mQueuedRequests.end();
queued_request_type::iterator cur = std::find(mQueuedRequests.begin(), end, easy_request.get_ptr());
if (cur == end)
return false; // Not found.
// We can't use erase because that uses assignment to move elements,
// because it isn't thread-safe. Therefore, move the element that we found to
// the back with swap (could just swap with the end immediately, but I don't
// want to break the order in which requests where added). Swap is also not
// thread-safe, but OK here because it only touches the objects in the deque,
// and the deque is protected by the lock on the AIPerServiceRequestQueue object.
queued_request_type::iterator prev = cur;
while (++cur != end)
{
prev->swap(*cur); // This is safe,
prev = cur;
}
mQueuedRequests.pop_back(); // if this is safe.
--sTotalQueued;
llassert(sTotalQueued >= 0);
return true;
}
void AIPerServiceRequestQueue::add_queued_to(curlthread::MultiHandle* multi_handle)
{
if (!mQueuedRequests.empty())
{
multi_handle->add_easy_request(mQueuedRequests.front());
mQueuedRequests.pop_front();
llassert(sTotalQueued > 0);
if (!--sTotalQueued)
{
// We obtained a request from the queue, and after that there we no more request in any queue.
sQueueEmpty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in some queue.
sQueueFull = true;
}
if (mQueuedRequests.empty())
{
// We obtained a request from the queue, and after that there we no more request in the queue of this host.
mQueueEmpty = true;
}
else
{
// We obtained a request from the queue, and even after that there was at least one more request in the queue of this host.
mQueueFull = true;
}
}
else
{
// We can add a new request, but there is none in the queue!
mRequestStarvation = true;
if (sTotalQueued == 0)
{
// The queue of every host is empty!
sRequestStarvation = true;
}
}
}
//static
void AIPerServiceRequestQueue::purge(void)
{
instance_map_wat instance_map_w(sInstanceMap);
for (iterator host = instance_map_w->begin(); host != instance_map_w->end(); ++host)
{
Dout(dc::curl, "Purging queue of host \"" << host->first << "\".");
PerServiceRequestQueue_wat per_service_w(*host->second);
size_t s = per_service_w->mQueuedRequests.size();
per_service_w->mQueuedRequests.clear();
sTotalQueued -= s;
llassert(sTotalQueued >= 0);
}
}

View File

@@ -0,0 +1,173 @@
/**
* @file aicurlperservice.h
* @brief Definition of class AIPerServiceRequestQueue
*
* Copyright (c) 2012, 2013, Aleric Inglewood.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* There are special exceptions to the terms and conditions of the GPL as
* it is applied to this Source Code. View the full text of the exception
* in the file doc/FLOSS-exception.txt in this software distribution.
*
* CHANGELOG
* and additional copyright holders.
*
* 04/11/2012
* Initial version, written by Aleric Inglewood @ SL
*
* 06/04/2013
* Renamed AIPrivate::PerHostRequestQueue[Ptr] to AIPerHostRequestQueue[Ptr]
* to allow public access.
*
* 09/04/2013
* Renamed everything "host" to "service" and use "hostname:port" as key
* instead of just "hostname".
*/
#ifndef AICURLPERSERVICE_H
#define AICURLPERSERVICE_H
#include "llerror.h" // llassert
#include <string>
#include <deque>
#include <map>
#include <boost/intrusive_ptr.hpp>
#include "aithreadsafe.h"
class AICurlEasyRequest;
class AIPerServiceRequestQueue;
namespace AICurlPrivate {
namespace curlthread { class MultiHandle; }
class RefCountedThreadSafePerServiceRequestQueue;
class ThreadSafeBufferedCurlEasyRequest;
// Forward declaration of BufferedCurlEasyRequestPtr (see aicurlprivate.h).
typedef boost::intrusive_ptr<ThreadSafeBufferedCurlEasyRequest> BufferedCurlEasyRequestPtr;
// AIPerServiceRequestQueue objects are created by the curl thread and destructed by the main thread.
// We need locking.
typedef AIThreadSafeSimpleDC<AIPerServiceRequestQueue> threadsafe_PerServiceRequestQueue;
typedef AIAccessConst<AIPerServiceRequestQueue> PerServiceRequestQueue_crat;
typedef AIAccess<AIPerServiceRequestQueue> PerServiceRequestQueue_rat;
typedef AIAccess<AIPerServiceRequestQueue> PerServiceRequestQueue_wat;
} // namespace AICurlPrivate
// We can't put threadsafe_PerServiceRequestQueue in a std::map because you can't copy a mutex.
// Therefore, use an intrusive pointer for the threadsafe type.
typedef boost::intrusive_ptr<AICurlPrivate::RefCountedThreadSafePerServiceRequestQueue> AIPerServiceRequestQueuePtr;
//-----------------------------------------------------------------------------
// AIPerServiceRequestQueue
// This class provides a static interface to create and maintain instances
// of AIPerServiceRequestQueue objects, so that at any moment there is at most
// one instance per hostname:port. Those instances then are used to queue curl
// requests when the maximum number of connections for that host already
// have been reached.
class AIPerServiceRequestQueue {
private:
typedef std::map<std::string, AIPerServiceRequestQueuePtr> instance_map_type;
typedef AIThreadSafeSimpleDC<instance_map_type> threadsafe_instance_map_type;
typedef AIAccess<instance_map_type> instance_map_rat;
typedef AIAccess<instance_map_type> instance_map_wat;
static threadsafe_instance_map_type sInstanceMap; // Map of AIPerServiceRequestQueue instances with the hostname as key.
friend class AIThreadSafeSimpleDC<AIPerServiceRequestQueue>; //threadsafe_PerServiceRequestQueue
AIPerServiceRequestQueue(void) : mQueuedCommands(0), mAdded(0), mQueueEmpty(false), mQueueFull(false), mRequestStarvation(false) { }
public:
typedef instance_map_type::iterator iterator;
typedef instance_map_type::const_iterator const_iterator;
// Utility function; extract canonical (lowercase) hostname and port from url.
static std::string extract_canonical_servicename(std::string const& url);
// Return (possibly create) a unique instance for the given hostname.
static AIPerServiceRequestQueuePtr instance(std::string const& servicename);
// Release instance (object will be deleted if this was the last instance).
static void release(AIPerServiceRequestQueuePtr& instance);
// Remove everything. Called upon viewer exit.
static void purge(void);
private:
typedef std::deque<AICurlPrivate::BufferedCurlEasyRequestPtr> queued_request_type;
int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue.
int mAdded; // Number of active easy handles with this host.
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
static LLAtomicS32 sTotalQueued; // The sum of mQueuedRequests.size() of all AIPerServiceRequestQueue objects together.
bool mQueueEmpty; // Set to true when the queue becomes precisely empty.
bool mQueueFull; // Set to true when the queue is popped and then still isn't empty;
bool mRequestStarvation; // Set to true when the queue was about to be popped but was already empty.
static bool sQueueEmpty; // Set to true when sTotalQueued becomes precisely zero as the result of popping any queue.
static bool sQueueFull; // Set to true when sTotalQueued is still larger than zero after popping any queue.
static bool sRequestStarvation; // Set to true when any queue was about to be popped when sTotalQueued was already zero.
public:
void added_to_command_queue(void) { ++mQueuedCommands; }
void removed_from_command_queue(void) { --mQueuedCommands; llassert(mQueuedCommands >= 0); }
void added_to_multi_handle(void); // Called when an easy handle for this host has been added to the multi handle.
void removed_from_multi_handle(void); // Called when an easy handle for this host is removed again from the multi handle.
bool throttled(void) const; // Returns true if the maximum number of allowed requests for this host have been added to the multi handle.
void queue(AICurlEasyRequest const& easy_request); // Add easy_request to the queue.
bool cancel(AICurlEasyRequest const& easy_request); // Remove easy_request from the queue (if it's there).
void add_queued_to(AICurlPrivate::curlthread::MultiHandle* mh);
// Add queued easy handle (if any) to the multi handle. The request is removed from the queue,
// followed by either a call to added_to_multi_handle() or to queue() to add it back.
S32 pipelined_requests(void) const { return mQueuedCommands + mQueuedRequests.size() + mAdded; }
static S32 total_queued_size(void) { return sTotalQueued; }
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerServiceRequestQueuePtr const& per_service, bool too_much_bandwidth);
private:
// Disallow copying.
AIPerServiceRequestQueue(AIPerServiceRequestQueue const&) { }
};
namespace AICurlPrivate {
class RefCountedThreadSafePerServiceRequestQueue : public threadsafe_PerServiceRequestQueue {
public:
RefCountedThreadSafePerServiceRequestQueue(void) : mReferenceCount(0) { }
bool exactly_two_left(void) const { return mReferenceCount == 2; }
private:
// Used by AIPerServiceRequestQueuePtr. Object is deleted when reference count reaches zero.
LLAtomicU32 mReferenceCount;
friend void intrusive_ptr_add_ref(RefCountedThreadSafePerServiceRequestQueue* p);
friend void intrusive_ptr_release(RefCountedThreadSafePerServiceRequestQueue* p);
};
extern U32 curl_concurrent_connections_per_service;
} // namespace AICurlPrivate
#endif // AICURLPERSERVICE_H

View File

@@ -34,7 +34,7 @@
#include <sstream>
#include "llatomic.h"
#include "llrefcount.h"
#include "aicurlperhost.h"
#include "aicurlperservice.h"
#include "aihttptimeout.h"
#include "llhttpclient.h"
@@ -304,8 +304,8 @@ class CurlEasyRequest : public CurlEasyHandle {
CURLcode mResult; //AIFIXME: this does not belong in the request object, but belongs in the response object.
AIHTTPTimeoutPolicy const* mTimeoutPolicy;
std::string mLowercaseHostname; // Lowercase hostname (canonicalized) extracted from the url.
PerHostRequestQueuePtr mPerHostPtr; // Pointer to the corresponding PerHostRequestQueue.
std::string mLowercaseServicename; // Lowercase hostname:port (canonicalized) extracted from the url.
AIPerServiceRequestQueuePtr mPerServicePtr; // Pointer to the corresponding AIPerServiceRequestQueue.
LLPointer<curlthread::HTTPTimeout> mTimeout;// Timeout administration object associated with last created CurlSocketInfo.
bool mTimeoutIsOrphan; // Set to true when mTimeout is not (yet) associated with a CurlSocketInfo.
#if defined(CWDEBUG) || defined(DEBUG_CURLIO)
@@ -316,7 +316,8 @@ class CurlEasyRequest : public CurlEasyHandle {
public:
// These two are only valid after finalizeRequest.
AIHTTPTimeoutPolicy const* getTimeoutPolicy(void) const { return mTimeoutPolicy; }
std::string const& getLowercaseHostname(void) const { return mLowercaseHostname; }
std::string const& getLowercaseServicename(void) const { return mLowercaseServicename; }
std::string getLowercaseHostname(void) const;
// Called by CurlSocketInfo to allow access to the last (after a redirect) HTTPTimeout object related to this request.
// This creates mTimeout (unless mTimeoutIsOrphan is set in which case it adopts the orphan).
LLPointer<curlthread::HTTPTimeout>& get_timeout_object(void);
@@ -347,10 +348,10 @@ class CurlEasyRequest : public CurlEasyHandle {
inline ThreadSafeBufferedCurlEasyRequest* get_lockobj(void);
inline ThreadSafeBufferedCurlEasyRequest const* get_lockobj(void) const;
// PerHost API.
PerHostRequestQueuePtr getPerHostPtr(void); // (Optionally create and) return a pointer to the unique
// PerHostRequestQueue corresponding to mLowercaseHostname.
bool removeFromPerHostQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all.
// PerService API.
AIPerServiceRequestQueuePtr getPerServicePtr(void); // (Optionally create and) return a pointer to the unique
// AIPerServiceRequestQueue corresponding to mLowercaseServicename.
bool removeFromPerServiceQueue(AICurlEasyRequest const&) const; // Remove this request from the per-host queue, if queued at all.
// Returns true if it was queued.
protected:
// Pass events to parent.

View File

@@ -32,7 +32,7 @@
#include "aicurlthread.h"
#include "aihttptimeoutpolicy.h"
#include "aihttptimeout.h"
#include "aicurlperhost.h"
#include "aicurlperservice.h"
#include "lltimer.h" // ms_sleep, get_clock_count
#include "llhttpstatuscodes.h"
#include "llbuffer.h"
@@ -204,6 +204,9 @@ int ioctlsocket(int fd, int, unsigned long* nonblocking_enable)
namespace AICurlPrivate {
LLAtomicS32 max_pipelined_requests(32);
LLAtomicS32 max_pipelined_requests_per_service(8);
enum command_st {
cmd_none,
cmd_add,
@@ -264,9 +267,15 @@ void Command::reset(void)
//
// If at this point addRequest is called again, then it is detected that the ThreadSafeBufferedCurlEasyRequest is active.
struct command_queue_st {
std::deque<Command> commands; // The commands
size_t size; // Number of add commands in the queue minus the number of remove commands.
};
// Multi-threaded queue for passing Command objects from the main-thread to the curl-thread.
AIThreadSafeSimpleDC<std::deque<Command> > command_queue;
typedef AIAccess<std::deque<Command> > command_queue_wat;
AIThreadSafeSimpleDC<command_queue_st> command_queue; // Fills 'size' with zero, because it's a global.
typedef AIAccess<command_queue_st> command_queue_wat;
typedef AIAccess<command_queue_st> command_queue_rat;
AIThreadSafeDC<Command> command_being_processed;
typedef AIWriteAccess<Command> command_being_processed_wat;
@@ -1289,7 +1298,7 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
// Access command_queue, and move command to command_being_processed.
{
command_queue_wat command_queue_w(command_queue);
if (command_queue_w->empty())
if (command_queue_w->commands.empty())
{
mWakeUpFlagMutex.lock();
mWakeUpFlag = false;
@@ -1297,8 +1306,22 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
break;
}
// Move the next command from the queue into command_being_processed.
*command_being_processed_wat(command_being_processed) = command_queue_w->front();
command_queue_w->pop_front();
command_st command;
{
command_being_processed_wat command_being_processed_w(command_being_processed);
*command_being_processed_w = command_queue_w->commands.front();
command = command_being_processed_w->command();
}
// Update the size: the number netto number of pending requests in the command queue.
command_queue_w->commands.pop_front();
if (command == cmd_add)
{
command_queue_w->size--;
}
else if (command == cmd_remove)
{
command_queue_w->size++;
}
}
// Access command_being_processed only.
{
@@ -1309,9 +1332,11 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
case cmd_boost: // FIXME: future stuff
break;
case cmd_add:
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue();
multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()));
break;
case cmd_remove:
PerServiceRequestQueue_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
multi_handle_w->remove_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), true);
break;
}
@@ -1520,8 +1545,8 @@ void AICurlThread::run(void)
continue;
}
// Clock count used for timeouts.
HTTPTimeout::sClockCount = get_clock_count();
Dout(dc::curl, "HTTPTimeout::sClockCount = " << HTTPTimeout::sClockCount);
HTTPTimeout::sTime_10ms = get_clock_count() * HTTPTimeout::sClockWidth_10ms;
Dout(dc::curl, "HTTPTimeout::sTime_10ms = " << HTTPTimeout::sTime_10ms);
if (ready == 0)
{
multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0);
@@ -1553,7 +1578,7 @@ void AICurlThread::run(void)
multi_handle_w->check_msg_queue();
}
// Clear the queued requests.
PerHostRequestQueue::purge();
AIPerServiceRequestQueue::purge();
}
AICurlMultiHandle::destroyInstance();
}
@@ -1561,6 +1586,8 @@ void AICurlThread::run(void)
//-----------------------------------------------------------------------------
// MultiHandle
LLAtomicU32 MultiHandle::sTotalAdded;
MultiHandle::MultiHandle(void) : mTimeout(-1), mReadPollSet(NULL), mWritePollSet(NULL)
{
mReadPollSet = new PollSet;
@@ -1653,6 +1680,7 @@ CURLMcode MultiHandle::socket_action(curl_socket_t sockfd, int ev_bitmask)
}
while(res == CURLM_CALL_MULTI_PERFORM);
llassert(mAddedEasyRequests.size() >= (size_t)running_handles);
AICurlInterface::Stats::running_handles = running_handles;
return res;
}
@@ -1677,17 +1705,17 @@ static U32 curl_max_total_concurrent_connections = 32; // Initialized on st
void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{
bool throttled = true; // Default.
PerHostRequestQueuePtr per_host;
AIPerServiceRequestQueuePtr per_service;
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat per_host_w(*per_host);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_host_w->throttled())
per_service = curl_easy_request_w->getPerServicePtr();
PerServiceRequestQueue_wat per_service_w(*per_service);
if (mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
{
curl_easy_request_w->set_timeout_opts();
if (curl_easy_request_w->add_handle_to_multi(curl_easy_request_w, mMultiHandle) == CURLM_OK)
{
per_host_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests.
per_service_w->added_to_multi_handle(); // (About to be) added to mAddedEasyRequests.
throttled = false; // Fall through...
}
}
@@ -1696,11 +1724,14 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
{ // ... to here.
std::pair<addedEasyRequests_type::iterator, bool> res = mAddedEasyRequests.insert(easy_request);
llassert(res.second); // May not have been added before.
Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
sTotalAdded++;
llassert(sTotalAdded == mAddedEasyRequests.size());
Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() <<
"; now processing " << mAddedEasyRequests.size() << " easy handles [running_handles = " << AICurlInterface::Stats::running_handles << "].");
return;
}
// The request could not be added, we have to queue it.
PerHostRequestQueue_wat(*per_host)->queue(easy_request);
PerServiceRequestQueue_wat(*per_service)->queue(easy_request);
#ifdef SHOW_ASSERT
// Not active yet, but it's no longer an error if next we try to remove the request.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false;
@@ -1717,7 +1748,7 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request
#ifdef SHOW_ASSERT
bool removed =
#endif
easy_request_w->removeFromPerHostQueue(easy_request);
easy_request_w->removeFromPerServiceQueue(easy_request);
#ifdef SHOW_ASSERT
if (removed)
{
@@ -1733,12 +1764,12 @@ CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request
CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command)
{
CURLMcode res;
PerHostRequestQueuePtr per_host;
AIPerServiceRequestQueuePtr per_service;
{
AICurlEasyRequest_wat curl_easy_request_w(**iter);
res = curl_easy_request_w->remove_handle_from_multi(curl_easy_request_w, mMultiHandle);
per_host = curl_easy_request_w->getPerHostPtr();
PerHostRequestQueue_wat(*per_host)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
per_service = curl_easy_request_w->getPerServicePtr();
PerServiceRequestQueue_wat(*per_service)->removed_from_multi_handle(); // (About to be) removed from mAddedEasyRequests.
#ifdef SHOW_ASSERT
curl_easy_request_w->mRemovedPerCommand = as_per_command;
#endif
@@ -1747,12 +1778,15 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
ThreadSafeBufferedCurlEasyRequest* lockobj = iter->get_ptr().get();
#endif
mAddedEasyRequests.erase(iter);
--sTotalAdded;
llassert(sTotalAdded == mAddedEasyRequests.size());
#if CWDEBUG
Dout(dc::curl, "MultiHandle::remove_easy_request: Removed AICurlEasyRequest " << (void*)lockobj << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
Dout(dc::curl, "MultiHandle::remove_easy_request: Removed AICurlEasyRequest " << (void*)lockobj <<
"; now processing " << mAddedEasyRequests.size() << " easy handles [running_handles = " << AICurlInterface::Stats::running_handles << "].");
#endif
// Attempt to add a queued request, if any.
PerHostRequestQueue_wat(*per_host)->add_queued_to(this);
PerServiceRequestQueue_wat(*per_service)->add_queued_to(this);
return res;
}
@@ -1926,7 +1960,8 @@ void clearCommandQueue(void)
{
// Clear the command queue now in order to avoid the global deinitialization order fiasco.
command_queue_wat command_queue_w(command_queue);
command_queue_w->clear();
command_queue_w->commands.clear();
command_queue_w->size = 0;
}
//-----------------------------------------------------------------------------
@@ -2328,7 +2363,7 @@ void AICurlEasyRequest::addRequest(void)
// Find the last command added.
command_st cmd = cmd_none;
for (std::deque<Command>::iterator iter = command_queue_w->begin(); iter != command_queue_w->end(); ++iter)
for (std::deque<Command>::iterator iter = command_queue_w->commands.begin(); iter != command_queue_w->commands.end(); ++iter)
{
if (*iter == *this)
{
@@ -2354,8 +2389,11 @@ void AICurlEasyRequest::addRequest(void)
}
#endif
// Add a command to add the new request to the multi session to the command queue.
command_queue_w->push_back(Command(*this, cmd_add));
AICurlEasyRequest_wat(*get())->add_queued();
command_queue_w->commands.push_back(Command(*this, cmd_add));
command_queue_w->size++;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->added_to_command_queue();
curl_easy_request_w->add_queued();
}
// Something was added to the queue, wake up the thread to get it.
wakeUpCurlThread();
@@ -2378,7 +2416,7 @@ void AICurlEasyRequest::removeRequest(void)
// Find the last command added.
command_st cmd = cmd_none;
for (std::deque<Command>::iterator iter = command_queue_w->begin(); iter != command_queue_w->end(); ++iter)
for (std::deque<Command>::iterator iter = command_queue_w->commands.begin(); iter != command_queue_w->commands.end(); ++iter)
{
if (*iter == *this)
{
@@ -2415,9 +2453,12 @@ void AICurlEasyRequest::removeRequest(void)
}
#endif
// Add a command to remove this request from the multi session to the command queue.
command_queue_w->push_back(Command(*this, cmd_remove));
command_queue_w->commands.push_back(Command(*this, cmd_remove));
command_queue_w->size--;
AICurlEasyRequest_wat curl_easy_request_w(*get());
PerServiceRequestQueue_wat(*curl_easy_request_w->getPerServicePtr())->removed_from_command_queue(); // Note really, but this has the same effect as 'added a remove command'.
// Suppress warning that would otherwise happen if the callbacks are revoked before the curl thread removed the request.
AICurlEasyRequest_wat(*get())->remove_queued();
curl_easy_request_w->remove_queued();
}
// Something was added to the queue, wake up the thread to get it.
wakeUpCurlThread();
@@ -2427,7 +2468,7 @@ void AICurlEasyRequest::removeRequest(void)
namespace AICurlInterface {
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerHost, bool NoVerifySSLCert)
void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentConnectionsPerService, bool NoVerifySSLCert)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
@@ -2436,8 +2477,10 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo
// Cache Debug Settings.
curl_max_total_concurrent_connections = CurlMaxTotalConcurrentConnections;
curl_concurrent_connections_per_host = CurlConcurrentConnectionsPerHost;
curl_concurrent_connections_per_service = CurlConcurrentConnectionsPerService;
gNoVerifySSLCert = NoVerifySSLCert;
max_pipelined_requests = curl_max_total_concurrent_connections;
max_pipelined_requests_per_service = curl_concurrent_connections_per_service;
AICurlThread::sInstance = new AICurlThread;
AICurlThread::sInstance->start();
@@ -2445,19 +2488,24 @@ void startCurlThread(U32 CurlMaxTotalConcurrentConnections, U32 CurlConcurrentCo
bool handleCurlMaxTotalConcurrentConnections(LLSD const& newvalue)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
U32 old = curl_max_total_concurrent_connections;
curl_max_total_concurrent_connections = newvalue.asInteger();
max_pipelined_requests += curl_max_total_concurrent_connections - old;
llinfos << "CurlMaxTotalConcurrentConnections set to " << curl_max_total_concurrent_connections << llendl;
return true;
}
bool handleCurlConcurrentConnectionsPerHost(LLSD const& newvalue)
bool handleCurlConcurrentConnectionsPerService(LLSD const& newvalue)
{
using namespace AICurlPrivate;
curl_concurrent_connections_per_host = newvalue.asInteger();
llinfos << "CurlConcurrentConnectionsPerHost set to " << curl_concurrent_connections_per_host << llendl;
U32 old = curl_concurrent_connections_per_service;
curl_concurrent_connections_per_service = newvalue.asInteger();
max_pipelined_requests_per_service += curl_concurrent_connections_per_service - old;
llinfos << "CurlConcurrentConnectionsPerService set to " << curl_concurrent_connections_per_service << llendl;
return true;
}
@@ -2467,5 +2515,146 @@ bool handleNoVerifySSLCert(LLSD const& newvalue)
return true;
}
U32 getNumHTTPCommands(void)
{
using namespace AICurlPrivate;
command_queue_rat command_queue_r(command_queue);
return command_queue_r->size;
}
U32 getNumHTTPQueued(void)
{
return AIPerServiceRequestQueue::total_queued_size();
}
U32 getNumHTTPAdded(void)
{
return AICurlPrivate::curlthread::MultiHandle::total_added_size();
}
} // namespace AICurlInterface
// Return true if we want at least one more HTTP request for this host.
//
// It's OK if this function is a bit fuzzy, but we don't want it to return
// true a hundred times on a row when it is called fast in a loop.
// Hence the following consideration:
//
// This function is called only from LLTextureFetchWorker::doWork, and when it returns true
// then doWork will call LLHTTPClient::request with a NULL default engine (signaling that
// it is OK to run in any thread).
//
// At the end, LLHTTPClient::request calls AIStateMachine::run, which in turn calls
// AIStateMachine::reset at the end. Because NULL is passed as default_engine, reset will
// call AIStateMachine::multiplex to immediately start running the state machine. This
// causes it to go through the states bs_reset, bs_initialize and then bs_multiplex with
// run state AICurlEasyRequestStateMachine_addRequest. Finally, in this state, multiplex
// calls AICurlEasyRequestStateMachine::multiplex_impl which then calls AICurlEasyRequest::addRequest
// which causes an increment of command_queue_w->size and AIPerServiceRequestQueue::mQueuedCommands.
//
// It is therefore guaranteed that in one loop of LLTextureFetchWorker::doWork,
// this size is incremented; stopping this function from returning true once we reached the
// threshold of "pipelines" requests (the sum of requests in the command queue, the ones
// throttled and queued in AIPerServiceRequestQueue::mQueuedRequests and the already
// running requests (in MultiHandle::mAddedEasyRequests)).
//
//static
bool AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(AIPerServiceRequestQueuePtr const& per_service, bool too_much_bandwidth)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
bool reject, equal, increment_threshold, decrement_threshold;
// Whether or not we're going to approve a new request, decrement the global threshold first, when appropriate.
// Atomic read max_pipelined_requests for the below calculations.
S32 const max_pipelined_requests_cache = max_pipelined_requests;
decrement_threshold = sQueueFull && !sQueueEmpty;
// Reset flags.
sQueueEmpty = sQueueFull = false;
if (decrement_threshold)
{
if (max_pipelined_requests_cache > curl_max_total_concurrent_connections)
{
// Decrement the threshold because since the last call to this function at least one curl request finished
// and was replaced with another request from the queue, but the queue never ran empty: we have too many
// queued requests.
--max_pipelined_requests;
}
}
// Check if it's ok to get a new request for this particular host and update the per-host threshold.
// Atomic read max_pipelined_requests_per_service for the below calculations.
S32 const max_pipelined_requests_per_service_cache = max_pipelined_requests_per_service;
{
PerServiceRequestQueue_rat per_service_r(*per_service);
S32 const pipelined_requests_per_service = per_service_r->pipelined_requests();
reject = pipelined_requests_per_service >= max_pipelined_requests_per_service_cache;
equal = pipelined_requests_per_service == max_pipelined_requests_per_service_cache;
increment_threshold = per_service_r->mRequestStarvation;
decrement_threshold = per_service_r->mQueueFull && !per_service_r->mQueueEmpty;
// Reset flags.
per_service_r->mQueueFull = per_service_r->mQueueEmpty = per_service_r->mRequestStarvation = false;
}
if (decrement_threshold)
{
if (max_pipelined_requests_per_service_cache > curl_concurrent_connections_per_service)
{
--max_pipelined_requests_per_service;
}
}
else if (increment_threshold && reject)
{
if (max_pipelined_requests_per_service_cache < 2 * curl_concurrent_connections_per_service)
{
max_pipelined_requests_per_service++;
// Immediately take the new threshold into account.
reject = !equal;
}
}
if (reject)
{
// Too many request for this host already.
return false;
}
//AIFIXME: better bandwidth check here.
if (too_much_bandwidth)
{
return false; // wait
}
// Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate.
{
command_queue_rat command_queue_r(command_queue);
S32 const pipelined_requests = command_queue_r->size + sTotalQueued + MultiHandle::total_added_size();
// We can't take the command being processed (command_being_processed) into account without
// introducing relatively long waiting times for some mutex (namely between when the command
// is moved from command_queue to command_being_processed, till it's actually being added to
// mAddedEasyRequests). The whole purpose of command_being_processed is to reduce the time
// that things are locked to micro seconds, so we'll just accept an off-by-one fuzziness
// here instead.
// The maximum number of requests that may be queued in command_queue is equal to the total number of requests
// that may exist in the pipeline minus the number of requests queued in AIPerServiceRequestQueue objects, minus
// the number of already running requests.
reject = pipelined_requests >= max_pipelined_requests_cache;
equal = pipelined_requests == max_pipelined_requests_cache;
increment_threshold = sRequestStarvation;
}
if (increment_threshold && reject)
{
if (max_pipelined_requests_cache < 2 * curl_max_total_concurrent_connections)
{
max_pipelined_requests++;
// Immediately take the new threshold into account.
reject = !equal;
}
}
return !reject;
}

View File

@@ -75,12 +75,13 @@ class MultiHandle : public CurlMultiHandle
typedef std::set<AICurlEasyRequest, AICurlEasyRequestCompare> addedEasyRequests_type;
addedEasyRequests_type mAddedEasyRequests; // All easy requests currently added to the multi handle.
long mTimeout; // The last timeout in ms as set by the callback CURLMOPT_TIMERFUNCTION.
static LLAtomicU32 sTotalAdded; // The (sum of the) size of mAddedEasyRequests (of every MultiHandle, but there is only one).
private:
// Store result and trigger events for easy request.
void finish_easy_request(AICurlEasyRequest const& easy_request, CURLcode result);
// Remove easy request at iter (must exist).
// Note that it's possible that a new request from a PerHostRequestQueue::mQueuedRequests is inserted before iter.
// Note that it's possible that a new request from a AIPerServiceRequestQueue::mQueuedRequests is inserted before iter.
CURLMcode remove_easy_request(addedEasyRequests_type::iterator const& iter, bool as_per_command);
static int socket_callback(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp);
@@ -96,6 +97,9 @@ class MultiHandle : public CurlMultiHandle
// Called from the main loop every time select() timed out.
void handle_stalls(void);
// Return the total number of added curl requests.
static U32 total_added_size(void) { return sTotalAdded; }
public:
//-----------------------------------------------------------------------------
// Curl socket administration:

View File

@@ -75,6 +75,7 @@ namespace AICurlPrivate {
class BufferedCurlEasyRequest {
public:
char const* getLowercaseHostname(void) const { return "hostname.com"; }
char const* getLowercaseServicename(void) const { return "hostname.com:12047"; }
void getinfo(const int&, double* p) { *p = 0.1; }
};
@@ -96,8 +97,8 @@ namespace curlthread {
// HTTPTimeout
//static
F64 const HTTPTimeout::sClockWidth = 1.0 / calc_clock_frequency(); // Time between two clock ticks, in seconds.
U64 HTTPTimeout::sClockCount; // Clock count, set once per select() exit.
F64 const HTTPTimeout::sClockWidth_10ms = 100.0 / calc_clock_frequency(); // Time between two clock ticks, in 10ms units.
U64 HTTPTimeout::sTime_10ms; // Time in 10ms units, set once per select() exit.
// CURL-THREAD
// This is called when body data was sent to the server socket.
@@ -125,7 +126,7 @@ bool HTTPTimeout::data_sent(size_t n, bool finished)
// | |
void HTTPTimeout::reset_lowspeed(void)
{
mLowSpeedClock = sClockCount;
mLowSpeedClock = sTime_10ms;
mLowSpeedOn = true;
mLastBytesSent = false; // We're just starting!
mLastSecond = -1; // This causes lowspeed to initialize the rest.
@@ -162,8 +163,8 @@ void HTTPTimeout::upload_finished(void)
// We finished uploading (if there was a body to upload at all), so no more transfer rate timeouts.
mLowSpeedOn = false;
// Timeout if the server doesn't reply quick enough.
mStalled = sClockCount + mPolicy->getReplyDelay() / sClockWidth;
DoutCurl("upload_finished: mStalled set to sClockCount (" << sClockCount << ") + " << (mStalled - sClockCount) << " (" << mPolicy->getReplyDelay() << " seconds)");
mStalled = sTime_10ms + 100 * mPolicy->getReplyDelay();
DoutCurl("upload_finished: mStalled set to Time_10ms (" << sTime_10ms << ") + " << (mStalled - sTime_10ms) << " (" << mPolicy->getReplyDelay() << " seconds)");
}
// CURL-THREAD
@@ -230,8 +231,7 @@ bool HTTPTimeout::lowspeed(size_t bytes, bool finished)
// less than low_speed_limit, we abort.
// When are we?
S32 second = (sClockCount - mLowSpeedClock) * sClockWidth;
llassert(sClockWidth > 0.0);
S32 second = (sTime_10ms - mLowSpeedClock) / 100;
// This REALLY should never happen, but due to another bug it did happened
// and caused something so evil and hard to find that... NEVER AGAIN!
llassert(second >= 0);
@@ -315,8 +315,8 @@ bool HTTPTimeout::lowspeed(size_t bytes, bool finished)
// Just give these bytes 4 more seconds to be written to the socket (after which we'll
// assume that the 'upload finished' detection failed and we'll wait another ReplyDelay
// seconds before finally, actually timing out.
mStalled = sClockCount + 4 / sClockWidth;
DoutCurl("mStalled set to sClockCount (" << sClockCount << ") + " << (mStalled - sClockCount) << " (4 seconds)");
mStalled = sTime_10ms + 400; // 4 seconds into the future.
DoutCurl("mStalled set to sTime_10ms (" << sTime_10ms << ") + 400 (4 seconds)");
return false;
}
// The average transfer rate over the passed low_speed_time seconds is too low. Abort the transfer.
@@ -368,8 +368,8 @@ bool HTTPTimeout::lowspeed(size_t bytes, bool finished)
while(total_bytes >= mintotalbytes);
}
// If this function isn't called again within max_stall_time seconds, we stalled.
mStalled = sClockCount + max_stall_time / sClockWidth;
DoutCurl("mStalled set to sClockCount (" << sClockCount << ") + " << (mStalled - sClockCount) << " (" << max_stall_time << " seconds)");
mStalled = sTime_10ms + 100 * max_stall_time;
DoutCurl("mStalled set to sTime_10ms (" << sTime_10ms << ") + " << (mStalled - sTime_10ms) << " (" << max_stall_time << " seconds)");
return false;
}
@@ -435,7 +435,7 @@ bool HTTPTimeout::maybe_upload_finished(void)
void HTTPTimeout::print_diagnostics(CurlEasyRequest const* curl_easy_request, char const* eff_url)
{
#ifndef HTTPTIMEOUT_TESTSUITE
llwarns << "Request to \"" << curl_easy_request->getLowercaseHostname() << "\" timed out for " << curl_easy_request->getTimeoutPolicy()->name() << llendl;
llwarns << "Request to \"" << curl_easy_request->getLowercaseServicename() << "\" timed out for " << curl_easy_request->getTimeoutPolicy()->name() << llendl;
llinfos << "Effective URL: \"" << eff_url << "\"." << llendl;
double namelookup_time, connect_time, appconnect_time, pretransfer_time, starttransfer_time;
curl_easy_request->getinfo(CURLINFO_NAMELOOKUP_TIME, &namelookup_time);

View File

@@ -85,11 +85,11 @@ class HTTPTimeout : public LLRefCount {
S32 mLastSecond; // The time at which lowspeed() was last called, in seconds since mLowSpeedClock.
S32 mOverwriteSecond; // The second at which the first bucket of this transfer will be overwritten.
U32 mTotalBytes; // The sum of all bytes in mBuckets.
U64 mLowSpeedClock; // Clock count at which low speed detection (re)started.
U64 mStalled; // The clock count at which this transaction is considered to be stalling if nothing is transfered anymore.
U64 mLowSpeedClock; // The time (sTime_10ms) at which low speed detection (re)started.
U64 mStalled; // The time (sTime_10ms) at which this transaction is considered to be stalling if nothing is transfered anymore.
public:
static F64 const sClockWidth; // Time between two clock ticks in seconds.
static U64 sClockCount; // Clock count used as 'now' during one loop of the main loop.
static F64 const sClockWidth_10ms; // Time between two clock ticks in 10 ms units.
static U64 sTime_10ms; // Time since the epoch in 10 ms units.
#if defined(CWDEBUG) || defined(DEBUG_CURLIO)
ThreadSafeBufferedCurlEasyRequest* mLockObj;
#endif
@@ -121,7 +121,7 @@ class HTTPTimeout : public LLRefCount {
void done(AICurlEasyRequest_wat const& curlEasyRequest_w, CURLcode code);
// Returns true when we REALLY timed out. Might call upload_finished heuristically.
bool has_stalled(void) { return mStalled < sClockCount && !maybe_upload_finished(); }
bool has_stalled(void) { return mStalled < sTime_10ms && !maybe_upload_finished(); }
// Called from BufferedCurlEasyRequest::processOutput if a timeout occurred.
void print_diagnostics(CurlEasyRequest const* curl_easy_request, char const* eff_url);

View File

@@ -171,40 +171,7 @@
<key>Value</key>
<integer>1</integer>
</map>
<key>HTTPRequestRate</key>
<map>
<key>Comment</key>
<string>Number of HTTP texture requests fired per second.</string>
<key>Persist</key>
<integer>1</integer>
<key>Type</key>
<string>U32</string>
<key>Value</key>
<integer>30</integer>
</map>
<key>HTTPMaxRequests</key>
<map>
<key>Comment</key>
<string>Maximum number of simultaneous HTTP requests in progress.</string>
<key>Persist</key>
<integer>1</integer>
<key>Type</key>
<string>U32</string>
<key>Value</key>
<integer>12</integer>
</map>
<key>HTTPMinRequests</key>
<map>
<key>Comment</key>
<string>Attempt to maintain at least this many HTTP requests in progress by ignoring bandwidth</string>
<key>Persist</key>
<integer>1</integer>
<key>Type</key>
<string>U32</string>
<key>Value</key>
<integer>2</integer>
</map>
<key>HTTPThrottleBandwidth</key>
<map>
<key>Comment</key>
@@ -4481,10 +4448,10 @@ This should be as low as possible, but too low may break functionality</string>
<key>Value</key>
<integer>64</integer>
</map>
<key>CurlConcurrentConnectionsPerHost</key>
<key>CurlConcurrentConnectionsPerService</key>
<map>
<key>Comment</key>
<string>Maximum number of simultaneous curl connections per host</string>
<string>Maximum number of simultaneous curl connections per host:port service</string>
<key>Persist</key>
<integer>0</integer>
<key>Type</key>

View File

@@ -1911,7 +1911,7 @@ bool LLAppViewer::initThreads()
startEngineThread();
AICurlInterface::startCurlThread(gSavedSettings.getU32("CurlMaxTotalConcurrentConnections"),
gSavedSettings.getU32("CurlConcurrentConnectionsPerHost"),
gSavedSettings.getU32("CurlConcurrentConnectionsPerService"),
gSavedSettings.getBOOL("NoVerifySSLCert"));
LLImage::initClass();

View File

@@ -254,6 +254,7 @@ private:
LLUUID mID;
LLHost mHost;
std::string mUrl;
AIPerServiceRequestQueuePtr mPerServicePtr; // Pointer to the AIPerServiceRequestQueue corresponding to the host of mUrl.
U8 mType;
F32 mImagePriority;
U32 mWorkPriority;
@@ -496,30 +497,6 @@ public:
SGHostBlackList::blacklist_t SGHostBlackList::blacklist;
#if 0
//call every time a connection is opened
//return true if connecting allowed
static bool sgConnectionThrottle() {
const U32 THROTTLE_TIMESTEPS_PER_SECOND = 10;
static const LLCachedControl<U32> max_connections_per_second("HTTPRequestRate", 30);
U32 max_connections = max_connections_per_second/THROTTLE_TIMESTEPS_PER_SECOND;
const U32 timestep = USEC_PER_SEC/THROTTLE_TIMESTEPS_PER_SECOND;
U64 now = LLTimer::getTotalTime();
std::deque<U64> timestamps;
while(!timestamps.empty() && (timestamps[0]<=now-timestep)) {
timestamps.pop_front();
}
if(timestamps.size() < max_connections) {
//llinfos << "throttle pass" << llendl;
timestamps.push_back(now);
return true;
} else {
//llinfos << "throttle fail" << llendl;
return false;
}
}
#endif
//////////////////////////////////////////////////////////////////////////////
// Cross-thread messaging for asset metrics.
@@ -820,6 +797,17 @@ LLTextureFetchWorker::LLTextureFetchWorker(LLTextureFetch* fetcher,
{
mCanUseNET = mUrl.empty() ;
if (!mCanUseNET)
{
// Probably a file://, but well; in that case servicename will be empty.
std::string servicename = AIPerServiceRequestQueue::extract_canonical_servicename(mUrl);
if (!servicename.empty())
{
// Make sure mPerServicePtr is up to date with mUrl.
mPerServicePtr = AIPerServiceRequestQueue::instance(servicename);
}
}
calcWorkPriority();
mType = host.isOk() ? LLImageBase::TYPE_AVATAR_BAKE : LLImageBase::TYPE_NORMAL;
//llinfos << "Create: " << mID << " mHost:" << host << " Discard=" << discard << " URL:"<< mUrl << llendl;
@@ -1175,6 +1163,7 @@ bool LLTextureFetchWorker::doWork(S32 param)
{
mUrl = http_url + "/?texture_id=" + mID.asString().c_str();
mWriteToCacheState = CAN_WRITE ; //because this texture has a fixed texture id.
mPerServicePtr = AIPerServiceRequestQueue::instance(AIPerServiceRequestQueue::extract_canonical_servicename(http_url));
}
else
{
@@ -1260,40 +1249,14 @@ bool LLTextureFetchWorker::doWork(S32 param)
{
if(mCanUseHTTP)
{
//NOTE:
//control the number of the http requests issued for:
//1, not opening too many file descriptors at the same time;
//2, control the traffic of http so udp gets bandwidth.
//
static const LLCachedControl<U32> max_http_requests("HTTPMaxRequests", 8);
static const LLCachedControl<U32> min_http_requests("HTTPMinRequests", 2);
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
// Don't control http bandwidth in Avination, they do it serverside
if(!gHippoGridManager->getConnectedGrid()->isAvination())
{
if(((U32)mFetcher->getNumHTTPRequests() >= max_http_requests) ||
((mFetcher->getTextureBandwidth() > throttle_bandwidth) &&
((U32)mFetcher->getNumHTTPRequests() > min_http_requests)))
{
return false ; //wait.
}
}
else
{
if(((U32)mFetcher->getNumHTTPRequests() >= 2))
{
return false ; //wait.
}
}
mFetcher->removeFromNetworkQueue(this, false);
S32 cur_size = 0;
if (mFormattedImage.notNull())
{
cur_size = mFormattedImage->getDataSize(); // amount of data we already have
if (mFormattedImage->getDiscardLevel() == 0)
{
// Already have all data.
mFetcher->removeFromNetworkQueue(this, false); // Note sure this is necessary, but it's what the old did --Aleric
if(cur_size > 0)
{
// We already have all the data, just decode it
@@ -1307,16 +1270,32 @@ bool LLTextureFetchWorker::doWork(S32 param)
}
}
}
// Let AICurl decide if we can process more HTTP requests at the moment or not.
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
if (!AIPerServiceRequestQueue::wantsMoreHTTPRequestsFor(mPerServicePtr, mFetcher->getTextureBandwidth() > throttle_bandwidth))
{
return false ; //wait.
}
mFetcher->removeFromNetworkQueue(this, false);
mRequestedSize = mDesiredSize - cur_size;
mRequestedDiscard = mDesiredDiscard;
mRequestedOffset = cur_size;
bool res = false;
if (!mUrl.empty())
{
mLoaded = FALSE;
mGetStatus = 0;
mGetReason.clear();
// Note: comparing mFetcher->getTextureBandwidth() with throttle_bandwidth is a bit like
// comparing apples and oranges, but it's only debug output. The first is the averaged
// bandwidth used for the body of successfully downloaded textures, averaged over roughtly
// 10 seconds, in kbits/s. The latter is the limit of the actual http curl downloads,
// including header and failures for anything (not just textures), averaged over the last
// second, also in kbits/s.
static const LLCachedControl<F32> throttle_bandwidth("HTTPThrottleBandwidth", 2000);
LL_DEBUGS("Texture") << "HTTP GET: " << mID << " Offset: " << mRequestedOffset
<< " Bytes: " << mRequestedSize
@@ -2273,15 +2252,6 @@ S32 LLTextureFetch::getNumRequests()
return size ;
}
S32 LLTextureFetch::getNumHTTPRequests()
{
mNetworkQueueMutex.lock() ;
S32 size = (S32)mHTTPTextureQueue.size();
mNetworkQueueMutex.unlock() ;
return size ;
}
U32 LLTextureFetch::getTotalNumHTTPRequests()
{
mNetworkQueueMutex.lock() ;

View File

@@ -88,7 +88,6 @@ public:
U32& fetch_priority_p, F32& fetch_dtime_p, F32& request_dtime_p, bool& can_use_http);
void dump();
S32 getNumRequests() ;
S32 getNumHTTPRequests() ;
U32 getTotalNumHTTPRequests() ;
// Public for access by callbacks

View File

@@ -64,6 +64,14 @@ LLTextureSizeView *gTextureCategoryView = NULL;
//static
std::set<LLViewerFetchedTexture*> LLTextureView::sDebugImages;
// Forward declaration.
namespace AICurlInterface {
U32 getNumHTTPCommands(void);
U32 getNumHTTPQueued(void);
U32 getNumHTTPAdded(void);
U32 getNumHTTPRunning(void);
} // namespace AICurlInterface
////////////////////////////////////////////////////////////////////////////
static std::string title_string1a("Tex UUID Area DDis(Req) DecodePri(Fetch) [download] pk/max");
@@ -593,7 +601,7 @@ void LLGLTexMemBar::draw()
#endif
//----------------------------------------------------------------------------
text = llformat("Textures: %d Fetch: %d(%d) Pkts:%d(%d) Cache R/W: %d/%d LFS:%d IW:%d RAW:%d(%d) HTP:%d DEC:%d CRE:%d ",
text = llformat("Textures: %d Fetch: %d(%d) Pkts:%d(%d) Cache R/W: %d/%d LFS:%d IW:%d RAW:%d(%d) HTTP:%d/%d/%d/%d DEC:%d CRE:%d ",
gTextureList.getNumImages(),
LLAppViewer::getTextureFetch()->getNumRequests(), LLAppViewer::getTextureFetch()->getNumDeletes(),
LLAppViewer::getTextureFetch()->mPacketCount, LLAppViewer::getTextureFetch()->mBadPacketCount,
@@ -601,7 +609,10 @@ void LLGLTexMemBar::draw()
LLLFSThread::sLocal->getPending(),
LLAppViewer::getImageDecodeThread()->getPending(),
LLImageRaw::sRawImageCount, LLImageRaw::sRawImageCachedCount,
LLAppViewer::getTextureFetch()->getNumHTTPRequests(),
AICurlInterface::getNumHTTPCommands(),
AICurlInterface::getNumHTTPQueued(),
AICurlInterface::getNumHTTPAdded(),
AICurlInterface::getNumHTTPRunning(),
LLAppViewer::getImageDecodeThread()->getPending(),
gTextureList.mCreateTextureList.size());
@@ -609,7 +620,13 @@ void LLGLTexMemBar::draw()
text_color, LLFontGL::LEFT, LLFontGL::TOP);
left += LLFontGL::getFontMonospace()->getWidth(text);
// This bandwidth is averaged over roughly 10 seconds (in kbps) and therefore pretty inaccurate.
// Also, it only takes into account actual texture data (not headers etc). But all it is used for
// is for the color of some text in the texture console, so I guess it doesn't matter.
F32 bandwidth = LLAppViewer::getTextureFetch()->getTextureBandwidth();
// This is the maximum bandwidth allowed for curl transactions (of any type and averaged per second),
// that is actually used to limit the number of HTTP texture requests (and only those).
// Comparing that with 'bandwidth' is a bit like comparing apples and oranges, but again... who really cares.
F32 max_bandwidth = gSavedSettings.getF32("HTTPThrottleBandwidth");
color = bandwidth > max_bandwidth ? LLColor4::red : bandwidth > max_bandwidth*.75f ? LLColor4::yellow : text_color;
color[VALPHA] = text_color[VALPHA];

View File

@@ -792,7 +792,7 @@ void settings_setup_listeners()
gSavedSettings.getControl("AscentAvatarZModifier")->getSignal()->connect(boost::bind(&handleAscentAvatarModifier, _2));
gSavedSettings.getControl("CurlMaxTotalConcurrentConnections")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlMaxTotalConcurrentConnections, _2));
gSavedSettings.getControl("CurlConcurrentConnectionsPerHost")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlConcurrentConnectionsPerHost, _2));
gSavedSettings.getControl("CurlConcurrentConnectionsPerService")->getSignal()->connect(boost::bind(&AICurlInterface::handleCurlConcurrentConnectionsPerService, _2));
gSavedSettings.getControl("NoVerifySSLCert")->getSignal()->connect(boost::bind(&AICurlInterface::handleNoVerifySSLCert, _2));
gSavedSettings.getControl("CurlTimeoutDNSLookup")->getValidateSignal()->connect(boost::bind(&validateCurlTimeoutDNSLookup, _2));

View File

@@ -1342,6 +1342,7 @@ void LLViewerTextureList::receiveImageHeader(LLMessageSystem *msg, void **user_d
{
received_size = msg->getReceiveSize() ;
}
// Only used for statistics and texture console.
gTextureList.sTextureBits += received_size * 8;
gTextureList.sTexturePackets++;