This commit is contained in:
Siana Gearz
2013-05-13 02:43:49 +02:00
12 changed files with 174 additions and 142 deletions

View File

@@ -1270,7 +1270,7 @@ bool BufferedCurlEasyRequest::sShuttingDown = false;
AIAverage BufferedCurlEasyRequest::sHTTPBandwidth(25);
BufferedCurlEasyRequest::BufferedCurlEasyRequest() :
mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL), mQueueIfTooMuchBandwidthUsage(false)
mRequestTransferedBytes(0), mTotalRawBytes(0), mStatus(HTTP_INTERNAL_ERROR_OTHER), mBufferEventsTarget(NULL)
{
AICurlInterface::Stats::BufferedCurlEasyRequest_count++;
}

View File

@@ -287,7 +287,11 @@ void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle)
{
if (!mQueuedRequests.empty())
{
multi_handle->add_easy_request(mQueuedRequests.front());
if (!multi_handle->add_easy_request(mQueuedRequests.front(), true))
{
// Throttled.
return;
}
mQueuedRequests.pop_front();
if (mQueuedRequests.empty())
{
@@ -366,3 +370,9 @@ void AIPerService::Approvement::honored(void)
}
}
void AIPerService::Approvement::not_honored(void)
{
honored();
llwarns << "Approvement for has not been honored." << llendl;
}

View File

@@ -114,7 +114,7 @@ class AIPerService {
private:
typedef std::deque<AICurlPrivate::BufferedCurlEasyRequestPtr> queued_request_type;
int mApprovedRequests; // The number of approved requests by wantsMoreHTTPRequestsFor that were not added to the command queue yet.
int mApprovedRequests; // The number of approved requests by approveHTTPRequestFor that were not added to the command queue yet.
int mQueuedCommands; // Number of add commands (minus remove commands) with this host in the command queue.
int mAdded; // Number of active easy handles with this host.
queued_request_type mQueuedRequests; // Waiting (throttled) requests.
@@ -201,29 +201,30 @@ class AIPerService {
// Called when CurlConcurrentConnectionsPerService changes.
static void adjust_concurrent_connections(int increment);
// The two following functions are static and have the AIPerService object passed
// as first argument as an AIPerServicePtr because that avoids the need of having
// the AIPerService object locked for the whole duration of the call.
// The functions only lock it when access is required.
// Returns true if curl can handle another request for this host.
// Should return false if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static bool wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service);
// Return true if too much bandwidth is being used.
static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms);
// A helper class to decrement mApprovedRequests after requests approved by wantsMoreHTTPRequestsFor were handled.
class Approvement {
// A helper class to decrement mApprovedRequests after requests approved by approveHTTPRequestFor were handled.
class Approvement : public LLThreadSafeRefCount {
private:
AIPerServicePtr mPerServicePtr;
bool mHonored;
public:
Approvement(AIPerServicePtr const& per_service) : mPerServicePtr(per_service), mHonored(false) { }
~Approvement() { honored(); }
~Approvement() { if (!mHonored) not_honored(); }
void honored(void);
void not_honored(void);
};
// The two following functions are static and have the AIPerService object passed
// as first argument as an AIPerServicePtr because that avoids the need of having
// the AIPerService object locked for the whole duration of the call.
// The functions only lock it when access is required.
// Returns approvement if curl can handle another request for this host.
// Should return NULL if the maximum allowed HTTP bandwidth is reached, or when
// the latency between request and actual delivery becomes too large.
static Approvement* approveHTTPRequestFor(AIPerServicePtr const& per_service);
// Return true if too much bandwidth is being used.
static bool checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms);
private:
// Disallow copying.
AIPerService(AIPerService const&);

View File

@@ -130,11 +130,17 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven
// Pause and unpause a connection.
CURLcode pause(int bitmask);
// Called if this request should be queued on the curl thread when too much bandwidth is being used.
void setApproved(AIPerService::Approvement* approved) { mApproved = approved; }
// Returns false when this request should be queued by the curl thread when too much bandwidth is being used.
bool approved(void) const { return mApproved; }
// Called when a request is queued for removal. In that case a race between the actual removal
// and revoking of the callbacks is harmless (and happens for the raw non-statemachine version).
void remove_queued(void) { mQueuedForRemoval = true; }
// In case it's added after being removed.
void add_queued(void) { mQueuedForRemoval = false; }
void add_queued(void) { mQueuedForRemoval = false; if (mApproved) { mApproved->honored(); } }
#ifdef DEBUG_CURLIO
void debug(bool debug) { if (mDebug) debug_curl_remove_easy(mEasyHandle); if (debug) debug_curl_add_easy(mEasyHandle); mDebug = debug; }
@@ -146,6 +152,7 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven
mutable char* mErrorBuffer;
AIPostFieldPtr mPostField; // This keeps the POSTFIELD data alive for as long as the easy handle exists.
bool mQueuedForRemoval; // Set if the easy handle is (probably) added to the multi handle, but is queued for removal.
LLPointer<AIPerService::Approvement> mApproved; // When not set then the curl thread should check bandwidth usage and queue this request if too much is being used.
#ifdef DEBUG_CURLIO
bool mDebug;
#endif
@@ -375,9 +382,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
void resetState(void);
void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, AIHTTPHeaders const& headers, LLHTTPClient::ResponderPtr responder);
// Called if this request should be queued on the curl thread when too much bandwidth is being used.
void queue_if_too_much_bandwidth_usage(void) { mQueueIfTooMuchBandwidthUsage = true; }
buffer_ptr_t& getInput(void) { return mInput; }
buffer_ptr_t& getOutput(void) { return mOutput; }
@@ -419,7 +423,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
U32 mRequestTransferedBytes;
size_t mTotalRawBytes; // Raw body data (still, possibly, compressed) received from the server so far.
AIBufferedCurlEasyRequestEvents* mBufferEventsTarget;
bool mQueueIfTooMuchBandwidthUsage; // Set if the curl thread should check bandwidth usage and queue this request if too much is being used.
public:
static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()).
@@ -456,9 +459,6 @@ class BufferedCurlEasyRequest : public CurlEasyRequest {
// Return true when prepRequest was already called and the object has not been
// invalidated as a result of calling timed_out().
bool isValid(void) const { return mResponder; }
// Returns true when this request should be queued by the curl thread when too much bandwidth is being used.
bool queueIfTooMuchBandwidthUsage(void) const { return mQueueIfTooMuchBandwidthUsage; }
};
inline ThreadSafeBufferedCurlEasyRequest* CurlEasyRequest::get_lockobj(void)

View File

@@ -1331,8 +1331,8 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
case cmd_boost: // FIXME: future stuff
break;
case cmd_add:
multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()), false);
PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->removed_from_command_queue();
multi_handle_w->add_easy_request(AICurlEasyRequest(command_being_processed_r->easy_request()));
break;
case cmd_remove:
PerService_wat(*AICurlEasyRequest_wat(*command_being_processed_r->easy_request())->getPerServicePtr())->added_to_command_queue(); // Not really, but this has the same effect as 'removed a remove command'.
@@ -1701,14 +1701,14 @@ CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const
static U32 curl_max_total_concurrent_connections = 32; // Initialized on start up by startCurlThread().
void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
bool MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue)
{
bool throttled = true; // Default.
AIPerServicePtr per_service;
{
AICurlEasyRequest_wat curl_easy_request_w(*easy_request);
per_service = curl_easy_request_w->getPerServicePtr();
bool too_much_bandwidth = curl_easy_request_w->queueIfTooMuchBandwidthUsage() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms);
bool too_much_bandwidth = !curl_easy_request_w->approved() && AIPerService::checkBandwidthUsage(per_service, get_clock_count() * HTTPTimeout::sClockWidth_40ms);
PerService_wat per_service_w(*per_service);
if (!too_much_bandwidth && mAddedEasyRequests.size() < curl_max_total_concurrent_connections && !per_service_w->throttled())
{
@@ -1731,7 +1731,12 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
llassert(sTotalAdded == mAddedEasyRequests.size());
Dout(dc::curl, "MultiHandle::add_easy_request: Added AICurlEasyRequest " << (void*)easy_request.get_ptr().get() <<
"; now processing " << mAddedEasyRequests.size() << " easy handles [running_handles = " << AICurlInterface::Stats::running_handles << "].");
return;
return true;
}
if (from_queue)
{
// Throttled. Do not add to queue, because it is already in the queue.
return false;
}
// The request could not be added, we have to queue it.
PerService_wat(*per_service)->queue(easy_request);
@@ -1739,6 +1744,7 @@ void MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
// Not active yet, but it's no longer an error if next we try to remove the request.
AICurlEasyRequest_wat(*easy_request)->mRemovedPerCommand = false;
#endif
return true;
}
CURLMcode MultiHandle::remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command)
@@ -2608,7 +2614,7 @@ bool AIPerService::sNoHTTPBandwidthThrottling;
// running requests (in MultiHandle::mAddedEasyRequests)).
//
//static
bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
AIPerService::Approvement* AIPerService::approveHTTPRequestFor(AIPerServicePtr const& per_service)
{
using namespace AICurlPrivate;
using namespace AICurlPrivate::curlthread;
@@ -2684,7 +2690,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
if (reject)
{
// Too many request for this service already.
return false;
return NULL;
}
// Throttle on bandwidth usage.
@@ -2692,7 +2698,7 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
{
// Too much bandwidth is being used, either in total or for this service.
PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all.
return false;
return NULL;
}
// Check if it's ok to get a new request based on the total number of requests and increment the threshold if appropriate.
@@ -2726,8 +2732,9 @@ bool AIPerService::wantsMoreHTTPRequestsFor(AIPerServicePtr const& per_service)
if (reject)
{
PerService_wat(*per_service)->mApprovedRequests--; // Not approved after all.
return NULL;
}
return !reject;
return new Approvement(per_service);
}
bool AIPerService::checkBandwidthUsage(AIPerServicePtr const& per_service, U64 sTime_40ms)

View File

@@ -59,7 +59,7 @@ class MultiHandle : public CurlMultiHandle
~MultiHandle();
// Add/remove an easy handle to/from a multi session.
void add_easy_request(AICurlEasyRequest const& easy_request);
bool add_easy_request(AICurlEasyRequest const& easy_request, bool from_queue);
CURLMcode remove_easy_request(AICurlEasyRequest const& easy_request, bool as_per_command = false);
// Reads/writes available data from a particular socket (non-blocking).

View File

@@ -201,15 +201,15 @@ void LLHTTPClient::request(
LLURLRequest::ERequestAction method,
Injector* body_injector,
LLHTTPClient::ResponderPtr responder,
AIHTTPHeaders& headers/*,*/
AIHTTPHeaders& headers,
AIPerService::Approvement* approved/*,*/
DEBUG_CURLIO_PARAM(EDebugCurl debug),
EKeepAlive keepalive,
EDoesAuthentication does_auth,
EAllowCompressedReply allow_compression,
AIStateMachine* parent,
AIStateMachine::state_type new_parent_state,
AIEngine* default_engine,
bool queue_if_too_much_bandwidth_usage)
AIEngine* default_engine)
{
llassert(responder);
@@ -222,7 +222,7 @@ void LLHTTPClient::request(
LLURLRequest* req;
try
{
req = new LLURLRequest(method, url, body_injector, responder, headers, keepalive, does_auth, allow_compression, queue_if_too_much_bandwidth_usage);
req = new LLURLRequest(method, url, body_injector, responder, headers, approved, keepalive, does_auth, allow_compression);
#ifdef DEBUG_CURLIO
req->mCurlEasyRequest.debug(debug);
#endif
@@ -243,22 +243,22 @@ void LLHTTPClient::getByteRange(std::string const& url, S32 offset, S32 bytes, R
{
headers.addHeader("Range", llformat("bytes=%d-%d", offset, offset + bytes - 1));
}
request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}
void LLHTTPClient::head(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}
void LLHTTPClient::get(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
request(url, HTTP_GET, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_GET, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}
void LLHTTPClient::getHeaderOnly(std::string const& url, ResponderHeadersOnly* responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
request(url, HTTP_HEAD, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_HEAD, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}
void LLHTTPClient::get(std::string const& url, LLSD const& query, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
@@ -694,22 +694,22 @@ U32 LLHTTPClient::blockingGetRaw(const std::string& url, std::string& body/*,*/
void LLHTTPClient::put(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
request(url, HTTP_PUT, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply);
request(url, HTTP_PUT, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), no_keep_alive, no_does_authentication, no_allow_compressed_reply);
}
void LLHTTPClient::post(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state)
{
request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state);
request(url, HTTP_POST, new LLSDInjector(body), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state);
}
void LLHTTPClient::post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state)
void LLHTTPClient::post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive, AIStateMachine* parent, AIStateMachine::state_type new_parent_state)
{
request(url, HTTP_POST, new LLSDInjector(body), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine, false);
request(url, HTTP_POST, new LLSDInjector(body), responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, no_does_authentication, allow_compressed_reply, parent, new_parent_state, &gMainThreadEngine);
}
void LLHTTPClient::postXMLRPC(std::string const& url, XMLRPC_REQUEST xmlrpc_request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
{
request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply);
request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, allow_compressed_reply);
}
void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC_VALUE value, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
@@ -720,33 +720,33 @@ void LLHTTPClient::postXMLRPC(std::string const& url, char const* method, XMLRPC
XMLRPC_RequestSetData(xmlrpc_request, value);
// XMLRPCInjector takes ownership of xmlrpc_request and will free it when done.
// LLURLRequest takes ownership of the XMLRPCInjector object and will free it when done.
request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply);
request(url, HTTP_POST, new XMLRPCInjector(xmlrpc_request), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, does_authentication, no_allow_compressed_reply);
}
void LLHTTPClient::postRaw(std::string const& url, char const* data, S32 size, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
{
request(url, HTTP_POST, new RawInjector(data, size), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
request(url, HTTP_POST, new RawInjector(data, size), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
}
void LLHTTPClient::postFile(std::string const& url, std::string const& filename, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
{
request(url, HTTP_POST, new FileInjector(filename), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
request(url, HTTP_POST, new FileInjector(filename), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
}
void LLHTTPClient::postFile(std::string const& url, LLUUID const& uuid, LLAssetType::EType asset_type, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug), EKeepAlive keepalive)
{
request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
request(url, HTTP_POST, new VFileInjector(uuid, asset_type), responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive);
}
// static
void LLHTTPClient::del(std::string const& url, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
request(url, HTTP_DELETE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_DELETE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}
// static
void LLHTTPClient::move(std::string const& url, std::string const& destination, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug))
{
headers.addHeader("Destination", destination);
request(url, HTTP_MOVE, NULL, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug));
request(url, HTTP_MOVE, NULL, responder, headers, NULL/*,*/ DEBUG_CURLIO_PARAM(debug));
}

View File

@@ -38,6 +38,7 @@
#include "llassettype.h"
#include "llhttpstatuscodes.h"
#include "aihttpheaders.h"
#include "aicurlperservice.h"
class LLUUID;
class LLPumpIO;
@@ -426,15 +427,15 @@ public:
ERequestAction method,
Injector* body_injector,
ResponderPtr responder,
AIHTTPHeaders& headers/*,*/
AIHTTPHeaders& headers,
AIPerService::Approvement* approved/*,*/
DEBUG_CURLIO_PARAM(EDebugCurl debug),
EKeepAlive keepalive = keep_alive,
EDoesAuthentication does_auth = no_does_authentication,
EAllowCompressedReply allow_compression = allow_compressed_reply,
AIStateMachine* parent = NULL,
/*AIStateMachine::state_type*/ U32 new_parent_state = 0,
AIEngine* default_engine = &gMainThreadEngine,
bool queue_if_too_much_bandwidth_usage = true);
AIEngine* default_engine = &gMainThreadEngine);
/** @name non-blocking API */
//@{
@@ -466,9 +467,9 @@ public:
static void post(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0)
{ AIHTTPHeaders headers; post(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); }
static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0);
static void post_nb(std::string const& url, LLSD const& body, ResponderPtr responder/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0)
{ AIHTTPHeaders headers; post_nb(url, body, responder, headers/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); }
static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0);
static void post_approved(std::string const& url, LLSD const& body, ResponderPtr responder, AIPerService::Approvement* approved/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive, AIStateMachine* parent = NULL, U32 new_parent_state = 0)
{ AIHTTPHeaders headers; post_approved(url, body, responder, headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug), keepalive, parent, new_parent_state); }
/** Takes ownership of request and deletes it when sent */
static void postXMLRPC(std::string const& url, XMLRPC_REQUEST request, ResponderPtr responder, AIHTTPHeaders& headers/*,*/ DEBUG_CURLIO_PARAM(EDebugCurl debug = debug_off), EKeepAlive keepalive = keep_alive);

View File

@@ -75,14 +75,14 @@ std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action)
// This might throw AICurlNoEasyHandle.
LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action, std::string const& url, Injector* body,
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, bool keepalive, bool is_auth, bool compression,
bool queue_if_too_much_bandwidth_usage) :
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers, AIPerService::Approvement* approved,
bool keepalive, bool is_auth, bool compression) :
mAction(action), mURL(url), mKeepAlive(keepalive), mIsAuth(is_auth), mNoCompression(!compression),
mBody(body), mResponder(responder), mHeaders(headers), mResponderNameCache(responder ? responder->getName() : "<uninitialized>")
{
if (queue_if_too_much_bandwidth_usage)
if (approved)
{
AICurlEasyRequest_wat(*mCurlEasyRequest)->queue_if_too_much_bandwidth_usage();
AICurlEasyRequest_wat(*mCurlEasyRequest)->setApproved(approved);
}
}

View File

@@ -66,7 +66,8 @@ class LLURLRequest : public AICurlEasyRequestStateMachine {
*/
LLURLRequest(ERequestAction action, std::string const& url, Injector* body,
LLHTTPClient::ResponderPtr responder, AIHTTPHeaders& headers,
bool keepalive, bool is_auth, bool no_compression, bool queue_if_too_much_bandwidth_usage);
AIPerService::Approvement* approved,
bool keepalive, bool is_auth, bool no_compression);
/**
* @brief Cached value of responder->getName() as passed to the constructor.

View File

@@ -204,7 +204,8 @@ void LLInventoryModelBackgroundFetch::backgroundFetch()
std::string url = region->getCapability("FetchInventory2");
if (!url.empty())
{
if (!mPerServicePtr)
bool mPerServicePtr_initialized = !!mPerServicePtr;
if (!mPerServicePtr_initialized)
{
// One time initialization needed for bulkFetch().
std::string servicename = AIPerService::extract_canonical_servicename(url);
@@ -212,10 +213,14 @@ void LLInventoryModelBackgroundFetch::backgroundFetch()
{
llinfos << "Initialized service name for bulk inventory fetching with \"" << servicename << "\"." << llendl;
mPerServicePtr = AIPerService::instance(servicename);
mPerServicePtr_initialized = true;
}
}
bulkFetch();
return;
if (mPerServicePtr_initialized)
{
bulkFetch();
return;
}
}
}
@@ -600,30 +605,29 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
LLViewerRegion* region = gAgent.getRegion();
if (gDisconnected || !region) return;
if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr))
{
return; // Wait.
}
// If AIPerService::wantsMoreHTTPRequestsFor returns true, then it approved ONE request.
// The code below might fire off zero, one or even more than one requests however!
// This object keeps track of that.
AIPerService::Approvement approvement(mPerServicePtr);
U32 item_count=0;
U32 folder_count=0;
U32 max_batch_size=5;
U32 const max_batch_size = 5;
U32 sort_order = gSavedSettings.getU32(LLInventoryPanel::DEFAULT_SORT_ORDER) & 0x1;
uuid_vec_t recursive_cats;
U32 folder_count=0;
U32 folder_lib_count=0;
U32 item_count=0;
U32 item_lib_count=0;
// This function can do up to four requests at once.
LLPointer<AIPerService::Approvement> approved_folder;
LLPointer<AIPerService::Approvement> approved_folder_lib;
LLPointer<AIPerService::Approvement> approved_item;
LLPointer<AIPerService::Approvement> approved_item_lib;
LLSD folder_request_body;
LLSD folder_request_body_lib;
LLSD item_request_body;
LLSD item_request_body_lib;
while (!mFetchQueue.empty()
&& (item_count + folder_count) < max_batch_size)
while (!mFetchQueue.empty())
{
const FetchQueueInfo& fetch_info = mFetchQueue.front();
if (fetch_info.mIsCategory)
@@ -645,10 +649,27 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
folder_sd["fetch_items"] = (LLSD::Boolean)TRUE;
if (ALEXANDRIA_LINDEN_ID == cat->getOwnerID())
{
if (folder_lib_count == max_batch_size ||
(folder_lib_count == 0 &&
!(approved_folder_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr))))
{
break;
}
folder_request_body_lib["folders"].append(folder_sd);
++folder_lib_count;
}
else
{
if (folder_count == max_batch_size ||
(folder_count == 0 &&
!(approved_folder = AIPerService::approveHTTPRequestFor(mPerServicePtr))))
{
break;
}
folder_request_body["folders"].append(folder_sd);
folder_count++;
++folder_count;
}
}
// May already have this folder, but append child folders to list.
if (fetch_info.mRecursive)
@@ -678,77 +699,69 @@ void LLInventoryModelBackgroundFetch::bulkFetch()
item_sd["item_id"] = itemp->getUUID();
if (itemp->getPermissions().getOwner() == gAgent.getID())
{
if (item_count == max_batch_size ||
(item_count == 0 &&
!(approved_item = AIPerService::approveHTTPRequestFor(mPerServicePtr))))
{
break;
}
item_request_body.append(item_sd);
++item_count;
}
else
{
if (item_lib_count == max_batch_size ||
(item_lib_count == 0 &&
!(approved_item_lib = AIPerService::approveHTTPRequestFor(mPerServicePtr))))
{
break;
}
item_request_body_lib.append(item_sd);
++item_lib_count;
}
//itemp->fetchFromServer();
item_count++;
}
}
mFetchQueue.pop_front();
}
if (item_count + folder_count > 0)
if (item_count + folder_count + item_lib_count + folder_lib_count > 0)
{
if (folder_count)
{
std::string url = region->getCapability("FetchInventoryDescendents2");
mFetchCount++;
if (folder_request_body["folders"].size())
{
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats);
LLHTTPClient::post_nb(url, folder_request_body, fetcher);
approvement.honored();
}
if (folder_request_body_lib["folders"].size())
{
std::string url_lib = gAgent.getRegion()->getCapability("FetchLibDescendents2");
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats);
LLHTTPClient::post_nb(url_lib, folder_request_body_lib, fetcher);
approvement.honored();
}
llassert(!url.empty());
++mFetchCount;
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body, recursive_cats);
LLHTTPClient::post_approved(url, folder_request_body, fetcher, approved_folder);
}
if (folder_lib_count)
{
std::string url = gAgent.getRegion()->getCapability("FetchLibDescendents2");
llassert(!url.empty());
++mFetchCount;
LLInventoryModelFetchDescendentsResponder *fetcher = new LLInventoryModelFetchDescendentsResponder(folder_request_body_lib, recursive_cats);
LLHTTPClient::post_approved(url, folder_request_body_lib, fetcher, approved_folder_lib);
}
if (item_count)
{
std::string url;
if (item_request_body.size())
{
mFetchCount++;
url = region->getCapability("FetchInventory2");
llassert(!url.empty());
if (!url.empty())
{
LLSD body;
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body;
LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body));
approvement.honored();
}
}
if (item_request_body_lib.size())
{
mFetchCount++;
url = region->getCapability("FetchLib2");
llassert(!url.empty());
if (!url.empty())
{
LLSD body;
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body_lib;
LLHTTPClient::post_nb(url, body, new LLInventoryModelFetchItemResponder(body));
approvement.honored();
}
}
std::string url = region->getCapability("FetchInventory2");
llassert(!url.empty());
++mFetchCount;
LLSD body;
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body;
LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item);
}
if (item_lib_count)
{
std::string url = region->getCapability("FetchLib2");
llassert(!url.empty());
++mFetchCount;
LLSD body;
body["agent_id"] = gAgent.getID();
body["items"] = item_request_body_lib;
LLHTTPClient::post_approved(url, body, new LLInventoryModelFetchItemResponder(body), approved_item_lib);
}
mFetchTimer.reset();
}

View File

@@ -1272,13 +1272,14 @@ bool LLTextureFetchWorker::doWork(S32 param)
}
// Let AICurl decide if we can process more HTTP requests at the moment or not.
if (!AIPerService::wantsMoreHTTPRequestsFor(mPerServicePtr))
// AIPerService::approveHTTPRequestFor returns approvement for ONE request.
// This object keeps track of whether or not that is honored.
LLPointer<AIPerService::Approvement> approved = AIPerService::approveHTTPRequestFor(mPerServicePtr);
if (!approved)
{
return false ; //wait.
}
// If AIPerService::wantsMoreHTTPRequestsFor returns true then it approved ONE request.
// This object keeps track of whether or not that is honored.
AIPerService::Approvement approvement(mPerServicePtr);
mFetcher->removeFromNetworkQueue(this, false);
@@ -1324,9 +1325,7 @@ bool LLTextureFetchWorker::doWork(S32 param)
}
LLHTTPClient::request(mUrl, LLHTTPClient::HTTP_GET, NULL,
new HTTPGetResponder(mFetcher, mID, LLTimer::getTotalTime(), mRequestedSize, mRequestedOffset, true),
headers/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL, false);
// Now the request was added to the command queue.
approvement.honored();
headers, approved/*,*/ DEBUG_CURLIO_PARAM(debug_off), keep_alive, no_does_authentication, allow_compressed_reply, NULL, 0, NULL);
res = true;
}
if (!res)