HTTP timeout work in progress.

* Remove progress meter call back, use read/write/header callbacks instead.
* Don't use timeout_lowspeed for ReplyDelay, instead use:
* Add timeout stuff to the main loop (CurlEasyRequest::mTimeoutStalled).

This patch fixes a few things compared to the previous version.
More things need to be fixed.
This commit is contained in:
Aleric Inglewood
2012-10-08 02:19:32 +02:00
parent a40fbf0da1
commit 1252b0bde2
4 changed files with 486 additions and 370 deletions

View File

@@ -70,7 +70,9 @@
marker << (void*)this->get_lockobj(); \
libcw_do.push_marker(); \
libcw_do.marker().assign(marker.str().data(), marker.str().size()); \
libcw_do.inc_indent(2); \
Dout(dc::curl, x); \
libcw_do.dec_indent(2); \
libcw_do.pop_marker(); \
} while(0)
#define DoutCurlEasyEntering(x) do { \
@@ -79,7 +81,9 @@
marker << (void*)this->get_lockobj(); \
libcw_do.push_marker(); \
libcw_do.marker().assign(marker.str().data(), marker.str().size()); \
libcw_do.inc_indent(2); \
DoutEntering(dc::curl, x); \
libcw_do.dec_indent(2); \
libcw_do.pop_marker(); \
} while(0)
#else // !CWDEBUG
@@ -810,9 +814,9 @@ DEFINE_FUNCTION_SETOPT1(curl_debug_callback, CURLOPT_DEBUGFUNCTION)
DEFINE_FUNCTION_SETOPT4(curl_write_callback, CURLOPT_HEADERFUNCTION, CURLOPT_WRITEFUNCTION, CURLOPT_INTERLEAVEFUNCTION, CURLOPT_READFUNCTION)
//DEFINE_FUNCTION_SETOPT1(curl_read_callback, CURLOPT_READFUNCTION)
DEFINE_FUNCTION_SETOPT1(curl_ssl_ctx_callback, CURLOPT_SSL_CTX_FUNCTION)
DEFINE_FUNCTION_SETOPT1(curl_progress_callback, CURLOPT_PROGRESSFUNCTION)
DEFINE_FUNCTION_SETOPT3(curl_conv_callback, CURLOPT_CONV_FROM_NETWORK_FUNCTION, CURLOPT_CONV_TO_NETWORK_FUNCTION, CURLOPT_CONV_FROM_UTF8_FUNCTION)
#if 0 // Not used by the viewer.
DEFINE_FUNCTION_SETOPT1(curl_progress_callback, CURLOPT_PROGRESSFUNCTION)
DEFINE_FUNCTION_SETOPT1(curl_seek_callback, CURLOPT_SEEKFUNCTION)
DEFINE_FUNCTION_SETOPT1(curl_ioctl_callback, CURLOPT_IOCTLFUNCTION)
DEFINE_FUNCTION_SETOPT1(curl_sockopt_callback, CURLOPT_SOCKOPTFUNCTION)
@@ -938,24 +942,6 @@ void CurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void* us
setopt(CURLOPT_SSL_CTX_DATA, this);
}
//static
int CurlEasyRequest::progressCallback(void* userdata, double dltotal, double dlnow, double ultotal, double ulnow)
{
CurlEasyRequest* self = static_cast<CurlEasyRequest*>(userdata);
ThreadSafeCurlEasyRequest* lockobj = self->get_lockobj();
AICurlEasyRequest_wat lock_self(*lockobj);
return self->mProgressCallback(self->mProgressCallbackUserData, dltotal, dlnow, ultotal, ulnow);
}
void CurlEasyRequest::setProgressCallback(curl_progress_callback callback, void* userdata)
{
mProgressCallback = callback;
mProgressCallbackUserData = userdata;
setopt(CURLOPT_PROGRESSFUNCTION, callback ? &CurlEasyRequest::progressCallback : NULL);
setopt(CURLOPT_PROGRESSDATA, userdata ? this : NULL);
setopt(CURLOPT_NOPROGRESS, callback ? 0L: 1L);
}
#define llmaybewarns lllog(LLApp::isExiting() ? LLError::LEVEL_INFO : LLError::LEVEL_WARN, NULL, NULL, false, true)
static size_t noHeaderCallback(char* ptr, size_t size, size_t nmemb, void* userdata)
@@ -982,19 +968,12 @@ static CURLcode noSSLCtxCallback(CURL* curl, void* sslctx, void* parm)
return CURLE_ABORTED_BY_CALLBACK;
}
static int noProgressCallback(void* userdata, double dltotal, double dlnow, double ultotal, double ulnow)
{
llmaybewarns << "Calling noProgressCallback(); curl session aborted." << llendl;
return CURLE_ABORTED_BY_CALLBACK;
}
void CurlEasyRequest::revokeCallbacks(void)
{
if (mHeaderCallback == &noHeaderCallback &&
mWriteCallback == &noWriteCallback &&
mReadCallback == &noReadCallback &&
mSSLCtxCallback == &noSSLCtxCallback &&
mProgressCallback == &noProgressCallback)
mSSLCtxCallback == &noSSLCtxCallback)
{
// Already revoked.
return;
@@ -1003,7 +982,6 @@ void CurlEasyRequest::revokeCallbacks(void)
mWriteCallback = &noWriteCallback;
mReadCallback = &noReadCallback;
mSSLCtxCallback = &noSSLCtxCallback;
mProgressCallback = &noProgressCallback;
if (active() && !no_warning())
{
llwarns << "Revoking callbacks on a still active CurlEasyRequest object!" << llendl;
@@ -1012,7 +990,6 @@ void CurlEasyRequest::revokeCallbacks(void)
curl_easy_setopt(getEasyHandle(), CURLOPT_WRITEHEADER, &noWriteCallback);
curl_easy_setopt(getEasyHandle(), CURLOPT_READFUNCTION, &noReadCallback);
curl_easy_setopt(getEasyHandle(), CURLOPT_SSL_CTX_FUNCTION, &noSSLCtxCallback);
curl_easy_setopt(getEasyHandle(), CURLOPT_PROGRESSFUNCTION, &noProgressCallback);
}
CurlEasyRequest::~CurlEasyRequest()
@@ -1339,146 +1316,233 @@ void CurlEasyRequest::finalizeRequest(std::string const& url, AIHTTPTimeoutPolic
//.............................................................................
// HTTP Timeout stuff
//static
F64 const CurlEasyRequest::sTimeoutClockWidth = 1.0 / calc_clock_frequency(); // Time between two clock ticks, in seconds.
U64 CurlEasyRequest::sTimeoutClockCount; // Clock count, set once per select() exit.
// CURL-THREAD
// This is called when the easy handle is actually being added to the multi handle (thus after being queued).
// AIFIXME: Doing this only when it is actually being added assures that the first curl easy handle that is
// being added for a particular host will be the one getting extra 'DNS lookup' connect time.
// However, if another curl easy handle for the same host is added immediately after, it will
// get less connect time, while it still (also) has to wait for this DNS lookup.
//
// <-----------------------------mTimeoutNothingReceivedYet-------------------------->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^
// |
void CurlEasyRequest::timeout_add_easy_request(void)
{
setopt(CURLOPT_CONNECTTIMEOUT, mTimeoutPolicy->getConnectTimeout(mTimeoutLowercaseHostname));
setopt(CURLOPT_TIMEOUT, mTimeoutPolicy->getCurlTransaction());
// We do NOT use CURLOPT_LOW_SPEED_TIME and CURLOPT_LOW_SPEED_LIMIT,
// instead use a progress meter callback.
setProgressCallback(&AICurlPrivate::CurlEasyRequest::timeout_progress, this);
// This boolean is valid (only) if we get a time out event from libcurl.
mTimeoutConnected = false;
}
// CURL-THREAD
// This is called when the connection succeeded (thus after DNS lookup and connect).
void CurlEasyRequest::timeout_connected(void)
{
DoutCurlEasy("Calling CurlEasyRequest::timeout_connected(): mTimeoutWaitingForReply = false");
mTimeoutConnected = true;
#ifdef CWDEBUG
mTimeout_connect_time = get_clock_count();
#endif
// Now that mTimeoutConnected is set we'll be calling timeout_progress(); initialize the variables used there.
mTimeout_dlnow = 0;
mTimeout_ulnow = 0;
mTimeout_progress_time = 0;
mTimeoutWaitingForReply = false;
mTimeoutNothingReceivedYet = true;
mTimeoutStalled = (U64)-1;
}
// CURL-THREAD
// This is called when data was sent to the server socket.
void CurlEasyRequest::timeout_data_sent(size_t n)
// This is called when body data was sent to the server socket.
// <--mTimeoutLowSpeedOn-->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^ ^ ^ ^ ^ ^
// | | | | | |
bool CurlEasyRequest::timeout_data_sent(size_t n)
{
if (!mTimeoutConnected)
// Generate events.
if (!mTimeoutLowSpeedOn)
{
// If we can send data (for the first time) then that's our way to know we connected.
timeout_connected();
// If we can send data (for the first time) then that's our only way to know we connected.
timeout_reset_lowspeed();
}
// Detect low speed.
return timeout_lowspeed(n);
}
// CURL-THREAD
// This is called when the 'low speed' timer should be started.
// <--mTimeoutLowSpeedOn--> <----mTimeoutLowSpeedOn---->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^ ^
// | |
void CurlEasyRequest::timeout_reset_lowspeed(void)
{
mTimeoutLowSpeedClock = get_clock_count();
mTimeoutLowSpeedOn = true;
mTimeoutLastSecond = -1; // This causes timeout_lowspeed to initialize the rest.
mTimeoutStalled = (U64)-1; // Stop reply delay timer.
}
// CURL-THREAD
// This is called when everything we had to send to the server has been sent.
// <--mTimeoutLowSpeedOn-->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^
// |
void CurlEasyRequest::timeout_upload_finished(void)
{
// We finished uploading (if there was a body to upload at all), so not more transfer rate timeouts.
mTimeoutLowSpeedOn = false;
// Timeout if the server doesn't reply quick enough.
mTimeoutStalled = sTimeoutClockCount + mTimeoutPolicy->getReplyDelay() / sTimeoutClockWidth;
}
// CURL-THREAD
// This is called when data was received from the server.
void CurlEasyRequest::timeout_data_received(size_t n)
{
if (n > 0)
{
#ifdef CWDEBUG
if (mTimeoutNothingReceivedYet)
DoutCurlEasy("CurlEasyRequest::timeout_data_received(): Received data from server: set mTimeoutWaitingForReply = false");
#endif
// We received something, now switch to getLowSpeedLimit()/getLowSpeedTime().
mTimeoutWaitingForReply = false;
mTimeoutNothingReceivedYet = false;
}
}
// CURL_THREAD
//static
int CurlEasyRequest::timeout_progress(void* userdata, double dltotal, double dlnow, double ultotal, double ulnow)
{
CurlEasyRequest* self = static_cast<CurlEasyRequest*>(userdata);
//AIFIXME: There has to be a better way to determine this (because it feels fuzzy, I only allow it to SET the boolean):
#ifdef CWDEBUG
if (!self->mTimeoutWaitingForReply && (self->mTimeoutNothingReceivedYet && ultotal > 0 && ulnow == ultotal))
Dout(dc::curl, "CurlEasyRequest::timeout_progress(): uploading data finished: set mTimeoutWaitingForReply = true [" << (void*)self->get_lockobj() << ']');
#endif
llassert(ulnow == 0 || ultotal > 0); // Do we always know the total upload size?
self->mTimeoutWaitingForReply = self->mTimeoutWaitingForReply || (self->mTimeoutNothingReceivedYet && ultotal > 0 && ulnow == ultotal);
return self->mTimeoutConnected ? self->timeout_progress(dlnow, ulnow) : 0;
}
// CURL_THREAD
// dlnow is the number of bytes of the BODY of the message, received from the server.
// ulnow is the number of bytes of the BODY of the message, sent to the server so far.
//
// Note that the algorithm used here is basically the same as libcurl uses
// for CURLOPT_LOW_SPEED_LIMIT / CURLOPT_LOW_SPEED_TIME, but we can't use that
// because we need to change the variables involved during transfer, which isn't
// officially supported by libcurl.
// Libcurl does the progress callback at the exact same point as checking for
// low speed, and the same data is passed (except the time, unfortunately), so
// the functionality is the same.
int CurlEasyRequest::timeout_progress(double dlnow, double ulnow)
// <-----------------------------mTimeoutNothingReceivedYet--------------------------><----mTimeoutLowSpeedOn---->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^ ^ ^ ^ ^ ^ ^ ^
// | | | | | | | |
bool CurlEasyRequest::timeout_data_received(size_t n)
{
static double const clock_frequency = calc_clock_frequency();
U64 last_time = mTimeout_progress_time;
mTimeout_progress_time = get_clock_count();
double transfer = (dlnow - mTimeout_dlnow) + (ulnow - mTimeout_ulnow); // Just combine up and download :p.
mTimeout_dlnow = dlnow;
mTimeout_ulnow = ulnow;
if (!last_time || mTimeout_progress_time == last_time) // Is this the first time this function is called (or are we called infinitely fast)?
// The HTTP header of the reply is the first thing we receive.
if (mTimeoutNothingReceivedYet && n > 0)
{
DoutCurlEasy("timeout_progress(" << dlnow << ", " << ulnow << ") called at " <<
((mTimeout_progress_time - mTimeout_connect_time) / clock_frequency) << " seconds after connect" <<
(last_time ? "" : " (first time)"));
// Start the timer: we need to have a too low transfer rate, for at least
// mTimeoutPolicy->getLowSpeedTime() seconds, counting from this moment.
mTransferOK = mTimeout_progress_time;
return 0;
mTimeoutNothingReceivedYet = false;
// We received something; switch to getLowSpeedLimit()/getLowSpeedTime().
timeout_reset_lowspeed();
}
transfer *= clock_frequency / (mTimeout_progress_time - last_time); // Bytes per second.
U64 timer = mTimeout_progress_time - mTransferOK; // Low speed timer in clock ticks.
U16 lowspeedtime = mTimeoutWaitingForReply ? mTimeoutPolicy->getReplyDelay() : mTimeoutPolicy->getLowSpeedTime();
U32 lowspeedlimit = mTimeoutWaitingForReply ? (U32)1 : mTimeoutPolicy->getLowSpeedLimit();
DoutCurlEasy("timeout_progress(" << dlnow << ", " << ulnow << ") called at " << ((mTimeout_progress_time - mTimeout_connect_time) / clock_frequency) <<
" seconds after connect (timer = " << (timer / clock_frequency) << " s ; transfer = " << transfer << " bytes/s; lowspeedtime = " << lowspeedtime <<
"; lowspeedlimit = " << lowspeedlimit << " (mTimeoutWaitingForReply == " << (mTimeoutWaitingForReply ? "true" : "false") << "))");
if (transfer >= lowspeedlimit)
return mTimeoutLowSpeedOn ? timeout_lowspeed(n) : false;
}
// CURL_THREAD
// bytes is the number of bytes we just sent or received (including headers).
// Returns true if the transfer should be aborted.
//
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
// | | | | | | | | | | | | | |
bool CurlEasyRequest::timeout_lowspeed(size_t bytes)
{
DoutCurlEasyEntering("CurlEasyRequest::timeout_lowspeed(" << bytes << ")");
// The algorithm to determine if we timed out if different from how libcurls CURLOPT_LOW_SPEED_TIME works.
//
// libcurl determines the transfer rate since the last call to an equivalent 'lowspeed' function, and then
// triggers a timeout if CURLOPT_LOW_SPEED_TIME long such a transfer value is less than CURLOPT_LOW_SPEED_LIMIT.
// That doesn't work right because once there IS data it can happen that this function is called a few
// times (with less than a milisecond in between) causing seemingly VERY high "transfer rate" spikes.
// The only correct way to determine the transfer rate is to actually average over CURLOPT_LOW_SPEED_TIME
// seconds.
//
// We do this as follows: we create low_speed_time (in seconds) buckets and fill them with the number
// of bytes received during that second. We also keep track of the sum of all bytes received between 'now'
// and 'now - llmax(starttime, low_speed_time)'. Then if that period reaches at least low_speed_time
// seconds, and the transfer rate (sum / low_speed_time) is less than low_speed_limit, we abort.
// When are we?
S32 second = (sTimeoutClockCount - mTimeoutLowSpeedClock) * sTimeoutClockWidth;
// If this is the same second as last time, just add the number of bytes to the current bucket.
if (second == mTimeoutLastSecond)
{
// Yay! Transfer rate is OK; restart timeout timer.
mTransferOK = mTimeout_progress_time;
mTimeoutTotalBytes += bytes;
mTimeoutBuckets[mTimeoutBucket] += bytes;
return false;
}
else if (timer > clock_frequency * lowspeedtime)
// We arrived at a new second.
// The below is at most executed once per second, even though for
// every currently connected transfer, CPU is not a big issue.
// Determine the number of buckets needed and increase the number of buckets if needed.
U16 const low_speed_time = mTimeoutPolicy->getLowSpeedTime();
if (low_speed_time > mTimeoutBuckets.size())
{
// We haven't seen a high enough transfer rate for too long. Abort the transfer.
llwarns <<
mTimeoutBuckets.resize(low_speed_time, 0);
}
S32 s = mTimeoutLastSecond;
mTimeoutLastSecond = second;
// If this is the first time this function is called, we need to do some initialization.
if (s == -1)
{
mTimeoutBucket = 0; // It doesn't really matter where we start.
mTimeoutTotalBytes = bytes;
mTimeoutBuckets[mTimeoutBucket] = bytes;
return false;
}
// Update all administration.
U16 bucket = mTimeoutBucket;
while(1) // Run over all the seconds that were skipped.
{
if (++bucket == low_speed_time)
bucket = 0;
if (++s == second)
break;
mTimeoutTotalBytes -= mTimeoutBuckets[bucket];
mTimeoutBuckets[bucket] = 0;
}
mTimeoutBucket = bucket;
mTimeoutTotalBytes -= mTimeoutBuckets[mTimeoutBucket];
mTimeoutTotalBytes += bytes;
mTimeoutBuckets[mTimeoutBucket] = bytes;
// Check if we timed out.
U32 const low_speed_limit = mTimeoutPolicy->getLowSpeedLimit();
U32 mintotalbytes = low_speed_limit * low_speed_time;
DoutCurlEasy("Transfered " << mTimeoutTotalBytes << " bytes in " << llmin(second, (S32)low_speed_time) << " seconds after " << second << " second" << ((second == 1) ? "" : "s") << ".");
if (second >= low_speed_time)
{
DoutCurlEasy("Average transfer rate is " << (mTimeoutTotalBytes / low_speed_time) << " bytes/s (low speed limit is " << low_speed_limit << " bytes/s)");
if (mTimeoutTotalBytes < mintotalbytes)
{
// The average transfer rate over the passed low_speed_time seconds is too low. Abort the transfer.
llwarns <<
#ifdef CWDEBUG
(void*)get_lockobj() << ": "
(void*)get_lockobj() << ": "
#endif
"aborting slow connection (transfer rate below " << lowspeedlimit <<
" for more than " << lowspeedtime << " second" << ((lowspeedtime == 1) ? "" : "s") << ")." << llendl;
return 1;
"aborting slow connection (average transfer rate below " << low_speed_limit <<
" for more than " << low_speed_time << " second" << ((low_speed_time == 1) ? "" : "s") << ")." << llendl;
return true;
}
}
return 0;
// Calculate how long the data transfer may stall until we should timeout.
llassert_always(mintotalbytes > 0);
S32 max_stall_time = 0;
U32 dropped_bytes = 0;
while(1)
{
if (++bucket == low_speed_time) // The next second the next bucket will be emptied.
bucket = 0;
++max_stall_time;
dropped_bytes += mTimeoutBuckets[bucket];
// Note how, when max_stall_time == low_speed_time, dropped_bytes has
// to be equal to mTimeoutTotalBytes, the sum of all vector elements.
llassert_always(max_stall_time < low_speed_time || dropped_bytes == mTimeoutTotalBytes);
// And thus the following will certainly abort.
if (second + max_stall_time >= low_speed_time && mTimeoutTotalBytes - dropped_bytes < mintotalbytes)
break;
}
// If this function isn't called until mTimeoutStalled, we stalled.
mTimeoutStalled = sTimeoutClockCount + max_stall_time / sTimeoutClockWidth;
return false;
}
// CURL-THREAD
// This is called immediately before done() after curl finished, with code.
// <----mTimeoutLowSpeedOn---->
// queued--><--DNS lookup + connect + send headers-->[<--send body (if any)-->]<--replydelay--><--receive headers + body--><--done
// ^
// |
void CurlEasyRequest::timeout_done(CURLcode code)
{
if (code == CURLE_OPERATION_TIMEDOUT && !mTimeoutConnected)
// AIFIXME: checking mTimeoutConnected doesn't work: CURL_POLL_OUT is set when connect() is called and always
// reset before we get here causing timeout_upload_finished() to be called: mTimeoutConnected will always be true.
// Also mTimeoutNothingReceivedYet is hardly accurate, as what we really want to know is if the DNS failed.
if (code == CURLE_OPERATION_TIMEDOUT && mTimeoutNothingReceivedYet)
{
// Inform policy object that there might be problems with this host.
AIHTTPTimeoutPolicy::connect_timed_out(mTimeoutLowercaseHostname);
// AIFIXME: use return value to change priority
}
// Abuse this boolean to tell any subsequent call to timeout_progress that this certainly can't timeout anymore.
mTimeoutConnected = false;
// Make sure no timeout will happen anymore.
mTimeoutLowSpeedOn = false;
mTimeoutStalled = (U64)-1;
}
// End of HTTP Timeout stuff.
@@ -1643,8 +1707,12 @@ size_t CurlResponderBuffer::curlWriteCallback(char* data, size_t size, size_t nm
// CurlResponderBuffer::setBodyLimit is never called, so buffer_w->mBodyLimit is infinite.
//S32 bytes = llmin(size * nmemb, buffer_w->mBodyLimit); buffer_w->mBodyLimit -= bytes;
buffer_w->getOutput()->append(sChannels.in(), (U8 const*)data, bytes);
buffer_w->mResponseTransferedBytes += bytes; // Accumulate data received from the server.
buffered_easy_request_w->timeout_data_received(bytes); // Timeout administration.
buffer_w->mResponseTransferedBytes += bytes; // Accumulate data received from the server.
if (buffered_easy_request_w->timeout_data_received(bytes)) // Update timeout administration.
{
// Transfer timed out. Return 0 which will abort with error CURLE_WRITE_ERROR.
return 0;
}
return bytes;
}
@@ -1660,8 +1728,12 @@ size_t CurlResponderBuffer::curlReadCallback(char* data, size_t size, size_t nme
S32 bytes = size * nmemb; // The maximum amount to read.
AICurlResponderBuffer_wat buffer_w(*lockobj);
buffer_w->mLastRead = buffer_w->getInput()->readAfter(sChannels.out(), buffer_w->mLastRead, (U8*)data, bytes);
buffer_w->mRequestTransferedBytes += bytes; // Accumulate data sent to the server.
buffered_easy_request_w->timeout_data_sent(bytes); // Timeout administration.
buffer_w->mRequestTransferedBytes += bytes; // Accumulate data sent to the server.
if (buffered_easy_request_w->timeout_data_sent(bytes)) // Timeout administration.
{
// Transfer timed out. Return CURL_READFUNC_ABORT which will abort with error CURLE_ABORTED_BY_CALLBACK.
return CURL_READFUNC_ABORT;
}
return bytes; // Return the amount actually read (might be lowered by readAfter()).
}
@@ -1678,8 +1750,11 @@ size_t CurlResponderBuffer::curlHeaderCallback(char* data, size_t size, size_t n
char const* const header_line = static_cast<char const*>(data);
size_t const header_len = size * nmemb;
buffered_easy_request_w->timeout_data_received(header_len); // Timeout administration.
if (buffered_easy_request_w->timeout_data_received(header_len)) // Update timeout administration.
{
// Transfer timed out. Return 0 which will abort with error CURLE_WRITE_ERROR.
return 0;
}
if (!header_len)
{
return header_len;

View File

@@ -39,290 +39,303 @@ class AIHTTPTimeoutPolicy;
class AICurlEasyRequestStateMachine;
namespace AICurlPrivate {
namespace curlthread { class MultiHandle; }
namespace curlthread { class MultiHandle; }
struct Stats {
static LLAtomicU32 easy_calls;
static LLAtomicU32 easy_errors;
static LLAtomicU32 easy_init_calls;
static LLAtomicU32 easy_init_errors;
static LLAtomicU32 easy_cleanup_calls;
static LLAtomicU32 multi_calls;
static LLAtomicU32 multi_errors;
struct Stats {
static LLAtomicU32 easy_calls;
static LLAtomicU32 easy_errors;
static LLAtomicU32 easy_init_calls;
static LLAtomicU32 easy_init_errors;
static LLAtomicU32 easy_cleanup_calls;
static LLAtomicU32 multi_calls;
static LLAtomicU32 multi_errors;
static void print(void);
};
static void print(void);
};
void handle_multi_error(CURLMcode code);
inline CURLMcode check_multi_code(CURLMcode code) { Stats::multi_calls++; if (code != CURLM_OK) handle_multi_error(code); return code; }
void handle_multi_error(CURLMcode code);
inline CURLMcode check_multi_code(CURLMcode code) { Stats::multi_calls++; if (code != CURLM_OK) handle_multi_error(code); return code; }
bool curlThreadIsRunning(void);
void wakeUpCurlThread(void);
void stopCurlThread(void);
bool curlThreadIsRunning(void);
void wakeUpCurlThread(void);
void stopCurlThread(void);
class ThreadSafeCurlEasyRequest;
class ThreadSafeBufferedCurlEasyRequest;
class ThreadSafeCurlEasyRequest;
class ThreadSafeBufferedCurlEasyRequest;
#define DECLARE_SETOPT(param_type) \
CURLcode setopt(CURLoption option, param_type parameter)
CURLcode setopt(CURLoption option, param_type parameter)
// This class wraps CURL*'s.
// It guarantees that a pointer is cleaned up when no longer needed, as required by libcurl.
class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEvents {
public:
CurlEasyHandle(void);
~CurlEasyHandle();
// This class wraps CURL*'s.
// It guarantees that a pointer is cleaned up when no longer needed, as required by libcurl.
class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEvents {
public:
CurlEasyHandle(void);
~CurlEasyHandle();
private:
// Disallow assignment.
CurlEasyHandle& operator=(CurlEasyHandle const*);
private:
// Disallow assignment.
CurlEasyHandle& operator=(CurlEasyHandle const*);
public:
// Reset all options of a libcurl session handle.
void reset(void) { llassert(!mActiveMultiHandle); curl_easy_reset(mEasyHandle); }
public:
// Reset all options of a libcurl session handle.
void reset(void) { llassert(!mActiveMultiHandle); curl_easy_reset(mEasyHandle); }
// Set options for a curl easy handle.
DECLARE_SETOPT(long);
DECLARE_SETOPT(long long);
DECLARE_SETOPT(void const*);
DECLARE_SETOPT(curl_debug_callback);
DECLARE_SETOPT(curl_write_callback);
//DECLARE_SETOPT(curl_read_callback); Same type as curl_write_callback
DECLARE_SETOPT(curl_ssl_ctx_callback);
DECLARE_SETOPT(curl_progress_callback);
DECLARE_SETOPT(curl_conv_callback);
// Set options for a curl easy handle.
DECLARE_SETOPT(long);
DECLARE_SETOPT(long long);
DECLARE_SETOPT(void const*);
DECLARE_SETOPT(curl_debug_callback);
DECLARE_SETOPT(curl_write_callback);
//DECLARE_SETOPT(curl_read_callback); Same type as curl_write_callback
DECLARE_SETOPT(curl_ssl_ctx_callback);
DECLARE_SETOPT(curl_conv_callback);
#if 0 // Not used by the viewer.
DECLARE_SETOPT(curl_seek_callback);
DECLARE_SETOPT(curl_ioctl_callback);
DECLARE_SETOPT(curl_sockopt_callback);
DECLARE_SETOPT(curl_opensocket_callback);
DECLARE_SETOPT(curl_closesocket_callback);
DECLARE_SETOPT(curl_sshkeycallback);
DECLARE_SETOPT(curl_chunk_bgn_callback);
DECLARE_SETOPT(curl_chunk_end_callback);
DECLARE_SETOPT(curl_fnmatch_callback);
DECLARE_SETOPT(curl_progress_callback);
DECLARE_SETOPT(curl_seek_callback);
DECLARE_SETOPT(curl_ioctl_callback);
DECLARE_SETOPT(curl_sockopt_callback);
DECLARE_SETOPT(curl_opensocket_callback);
DECLARE_SETOPT(curl_closesocket_callback);
DECLARE_SETOPT(curl_sshkeycallback);
DECLARE_SETOPT(curl_chunk_bgn_callback);
DECLARE_SETOPT(curl_chunk_end_callback);
DECLARE_SETOPT(curl_fnmatch_callback);
#endif
// Automatically cast int types to a long. Note that U32/S32 are int and
// that you can overload int and long even if they have the same size.
CURLcode setopt(CURLoption option, U32 parameter) { return setopt(option, (long)parameter); }
CURLcode setopt(CURLoption option, S32 parameter) { return setopt(option, (long)parameter); }
// Automatically cast int types to a long. Note that U32/S32 are int and
// that you can overload int and long even if they have the same size.
CURLcode setopt(CURLoption option, U32 parameter) { return setopt(option, (long)parameter); }
CURLcode setopt(CURLoption option, S32 parameter) { return setopt(option, (long)parameter); }
// Clone a libcurl session handle using all the options previously set.
//CurlEasyHandle(CurlEasyHandle const& orig);
// Clone a libcurl session handle using all the options previously set.
//CurlEasyHandle(CurlEasyHandle const& orig);
// URL encode/decode the given string.
char* escape(char* url, int length);
char* unescape(char* url, int inlength , int* outlength);
// URL encode/decode the given string.
char* escape(char* url, int length);
char* unescape(char* url, int inlength , int* outlength);
// Extract information from a curl handle.
private:
CURLcode getinfo_priv(CURLINFO info, void* data);
public:
// The rest are inlines to provide some type-safety.
CURLcode getinfo(CURLINFO info, char** data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, curl_slist** data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, double* data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, long* data) { return getinfo_priv(info, data); }
// Extract information from a curl handle.
private:
CURLcode getinfo_priv(CURLINFO info, void* data);
public:
// The rest are inlines to provide some type-safety.
CURLcode getinfo(CURLINFO info, char** data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, curl_slist** data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, double* data) { return getinfo_priv(info, data); }
CURLcode getinfo(CURLINFO info, long* data) { return getinfo_priv(info, data); }
#ifdef __LP64__ // sizeof(long) > sizeof(int) ?
// Overload for integer types that are too small (libcurl demands a long).
CURLcode getinfo(CURLINFO info, S32* data) { long ldata; CURLcode res = getinfo_priv(info, &ldata); *data = static_cast<S32>(ldata); return res; }
CURLcode getinfo(CURLINFO info, U32* data) { long ldata; CURLcode res = getinfo_priv(info, &ldata); *data = static_cast<U32>(ldata); return res; }
// Overload for integer types that are too small (libcurl demands a long).
CURLcode getinfo(CURLINFO info, S32* data) { long ldata; CURLcode res = getinfo_priv(info, &ldata); *data = static_cast<S32>(ldata); return res; }
CURLcode getinfo(CURLINFO info, U32* data) { long ldata; CURLcode res = getinfo_priv(info, &ldata); *data = static_cast<U32>(ldata); return res; }
#else // sizeof(long) == sizeof(int)
CURLcode getinfo(CURLINFO info, S32* data) { return getinfo_priv(info, reinterpret_cast<long*>(data)); }
CURLcode getinfo(CURLINFO info, U32* data) { return getinfo_priv(info, reinterpret_cast<long*>(data)); }
CURLcode getinfo(CURLINFO info, S32* data) { return getinfo_priv(info, reinterpret_cast<long*>(data)); }
CURLcode getinfo(CURLINFO info, U32* data) { return getinfo_priv(info, reinterpret_cast<long*>(data)); }
#endif
// Perform a file transfer (blocking).
CURLcode perform(void);
// Pause and unpause a connection.
CURLcode pause(int bitmask);
// Perform a file transfer (blocking).
CURLcode perform(void);
// Pause and unpause a connection.
CURLcode pause(int bitmask);
// Called when a request is queued for removal. In that case a race between the actual removal
// and revoking of the callbacks is harmless (and happens for the raw non-statemachine version).
void remove_queued(void) { mQueuedForRemoval = true; }
// In case it's added after being removed.
void add_queued(void) { mQueuedForRemoval = false; }
// Called when a request is queued for removal. In that case a race between the actual removal
// and revoking of the callbacks is harmless (and happens for the raw non-statemachine version).
void remove_queued(void) { mQueuedForRemoval = true; }
// In case it's added after being removed.
void add_queued(void) { mQueuedForRemoval = false; }
private:
CURL* mEasyHandle;
CURLM* mActiveMultiHandle;
char* mErrorBuffer;
AIPostFieldPtr mPostField; // This keeps the POSTFIELD data alive for as long as the easy handle exists.
bool mQueuedForRemoval; // Set if the easy handle is (probably) added to the multi handle, but is queued for removal.
private:
CURL* mEasyHandle;
CURLM* mActiveMultiHandle;
char* mErrorBuffer;
AIPostFieldPtr mPostField; // This keeps the POSTFIELD data alive for as long as the easy handle exists.
bool mQueuedForRemoval; // Set if the easy handle is (probably) added to the multi handle, but is queued for removal.
#ifdef SHOW_ASSERT
public:
bool mRemovedPerCommand; // Set if mActiveMultiHandle was reset as per command from the main thread.
public:
bool mRemovedPerCommand; // Set if mActiveMultiHandle was reset as per command from the main thread.
#endif
private:
// This should only be called from MultiHandle; add/remove an easy handle to/from a multi handle.
friend class curlthread::MultiHandle;
CURLMcode add_handle_to_multi(AICurlEasyRequest_wat& curl_easy_request_w, CURLM* multi_handle);
CURLMcode remove_handle_from_multi(AICurlEasyRequest_wat& curl_easy_request_w, CURLM* multi_handle);
private:
// This should only be called from MultiHandle; add/remove an easy handle to/from a multi handle.
friend class curlthread::MultiHandle;
CURLMcode add_handle_to_multi(AICurlEasyRequest_wat& curl_easy_request_w, CURLM* multi_handle);
CURLMcode remove_handle_from_multi(AICurlEasyRequest_wat& curl_easy_request_w, CURLM* multi_handle);
public:
// Returns true if this easy handle was added to a curl multi handle.
bool active(void) const { return mActiveMultiHandle; }
public:
// Returns true if this easy handle was added to a curl multi handle.
bool active(void) const { return mActiveMultiHandle; }
// Returns true when it is expected that the parent will revoke callbacks before the curl
// easy handle is removed from the multi handle; that usually happens when an external
// error demands termination of the request (ie, an expiration).
bool no_warning(void) const { return mQueuedForRemoval || LLApp::isExiting(); }
// Returns true when it is expected that the parent will revoke callbacks before the curl
// easy handle is removed from the multi handle; that usually happens when an external
// error demands termination of the request (ie, an expiration).
bool no_warning(void) const { return mQueuedForRemoval || LLApp::isExiting(); }
// Used for debugging purposes.
bool operator==(CURL* easy_handle) const { return mEasyHandle == easy_handle; }
// Used for debugging purposes.
bool operator==(CURL* easy_handle) const { return mEasyHandle == easy_handle; }
private:
// Call this prior to every curl_easy function whose return value is passed to check_easy_code.
void setErrorBuffer(void);
private:
// Call this prior to every curl_easy function whose return value is passed to check_easy_code.
void setErrorBuffer(void);
static void handle_easy_error(CURLcode code);
static void handle_easy_error(CURLcode code);
// Always first call setErrorBuffer()!
static inline CURLcode check_easy_code(CURLcode code)
{
Stats::easy_calls++;
if (code != CURLE_OK)
handle_easy_error(code);
return code;
}
// Always first call setErrorBuffer()!
static inline CURLcode check_easy_code(CURLcode code)
{
Stats::easy_calls++;
if (code != CURLE_OK)
handle_easy_error(code);
return code;
}
protected:
// Return the underlying curl easy handle.
CURL* getEasyHandle(void) const { return mEasyHandle; }
protected:
// Return the underlying curl easy handle.
CURL* getEasyHandle(void) const { return mEasyHandle; }
// Keep POSTFIELD data alive.
void setPostField(AIPostFieldPtr const& post_field_ptr) { mPostField = post_field_ptr; }
// Keep POSTFIELD data alive.
void setPostField(AIPostFieldPtr const& post_field_ptr) { mPostField = post_field_ptr; }
private:
// Return, and possibly create, the curl (easy) error buffer used by the current thread.
static char* getTLErrorBuffer(void);
};
private:
// Return, and possibly create, the curl (easy) error buffer used by the current thread.
static char* getTLErrorBuffer(void);
};
// CurlEasyRequest adds a slightly more powerful interface that can be used
// to set the options on a curl easy handle.
//
// Calling sendRequest() will then connect to the given URL and perform
// the data exchange. Use getResult() to determine if an error occurred.
//
// Note that the life cycle of a CurlEasyRequest is controlled by AICurlEasyRequest:
// a CurlEasyRequest is only ever created as base class of a ThreadSafeCurlEasyRequest,
// which is only created by creating a AICurlEasyRequest. When the last copy of such
// AICurlEasyRequest is deleted, then also the ThreadSafeCurlEasyRequest is deleted
// and the CurlEasyRequest destructed.
class CurlEasyRequest : public CurlEasyHandle {
private:
void setPost_raw(U32 size, char const* data);
public:
void setPost(U32 size) { setPost_raw(size, NULL); }
void setPost(AIPostFieldPtr const& postdata, U32 size);
void setPost(char const* data, U32 size) { setPost(new AIPostField(data), size); }
void setoptString(CURLoption option, std::string const& value);
void addHeader(char const* str);
void addHeaders(AIHTTPHeaders const& headers);
// CurlEasyRequest adds a slightly more powerful interface that can be used
// to set the options on a curl easy handle.
//
// Calling sendRequest() will then connect to the given URL and perform
// the data exchange. Use getResult() to determine if an error occurred.
//
// Note that the life cycle of a CurlEasyRequest is controlled by AICurlEasyRequest:
// a CurlEasyRequest is only ever created as base class of a ThreadSafeCurlEasyRequest,
// which is only created by creating a AICurlEasyRequest. When the last copy of such
// AICurlEasyRequest is deleted, then also the ThreadSafeCurlEasyRequest is deleted
// and the CurlEasyRequest destructed.
class CurlEasyRequest : public CurlEasyHandle {
private:
void setPost_raw(U32 size, char const* data);
public:
void setPost(U32 size) { setPost_raw(size, NULL); }
void setPost(AIPostFieldPtr const& postdata, U32 size);
void setPost(char const* data, U32 size) { setPost(new AIPostField(data), size); }
void setoptString(CURLoption option, std::string const& value);
void addHeader(char const* str);
void addHeaders(AIHTTPHeaders const& headers);
private:
// Callback stubs.
static size_t headerCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static size_t readCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static CURLcode SSLCtxCallback(CURL* curl, void* sslctx, void* userdata);
static int progressCallback(void* clientp, double dltotal, double dlnow, double ultotal, double ulnow);
private:
// Callback stubs.
static size_t headerCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static size_t writeCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static size_t readCallback(char* ptr, size_t size, size_t nmemb, void* userdata);
static CURLcode SSLCtxCallback(CURL* curl, void* sslctx, void* userdata);
curl_write_callback mHeaderCallback;
void* mHeaderCallbackUserData;
curl_write_callback mWriteCallback;
void* mWriteCallbackUserData;
curl_read_callback mReadCallback;
void* mReadCallbackUserData;
curl_ssl_ctx_callback mSSLCtxCallback;
void* mSSLCtxCallbackUserData;
curl_progress_callback mProgressCallback;
void* mProgressCallbackUserData;
curl_write_callback mHeaderCallback;
void* mHeaderCallbackUserData;
curl_write_callback mWriteCallback;
void* mWriteCallbackUserData;
curl_read_callback mReadCallback;
void* mReadCallbackUserData;
curl_ssl_ctx_callback mSSLCtxCallback;
void* mSSLCtxCallbackUserData;
public:
void setHeaderCallback(curl_write_callback callback, void* userdata);
void setWriteCallback(curl_write_callback callback, void* userdata);
void setReadCallback(curl_read_callback callback, void* userdata);
void setSSLCtxCallback(curl_ssl_ctx_callback callback, void* userdata);
void setProgressCallback(curl_progress_callback callback, void* userdata);
public:
void setHeaderCallback(curl_write_callback callback, void* userdata);
void setWriteCallback(curl_write_callback callback, void* userdata);
void setReadCallback(curl_read_callback callback, void* userdata);
void setSSLCtxCallback(curl_ssl_ctx_callback callback, void* userdata);
// Call this if the set callbacks are about to be invalidated.
void revokeCallbacks(void);
// Call this if the set callbacks are about to be invalidated.
void revokeCallbacks(void);
// Reset everything to the state it was in when this object was just created.
void resetState(void);
// Reset everything to the state it was in when this object was just created.
void resetState(void);
private:
// Called from applyDefaultOptions.
void applyProxySettings(void);
private:
// Called from applyDefaultOptions.
void applyProxySettings(void);
// Used in applyProxySettings.
static CURLcode curlCtxCallback(CURL* curl, void* sslctx, void* parm);
// Used in applyProxySettings.
static CURLcode curlCtxCallback(CURL* curl, void* sslctx, void* parm);
public:
// Set default options that we want applied to all curl easy handles.
void applyDefaultOptions(void);
public:
// Set default options that we want applied to all curl easy handles.
void applyDefaultOptions(void);
// Prepare the request for adding it to a multi session, or calling perform.
// This actually adds the headers that were collected with addHeader.
void finalizeRequest(std::string const& url, AIHTTPTimeoutPolicy const& policy, AICurlEasyRequestStateMachine* state_machine);
// Prepare the request for adding it to a multi session, or calling perform.
// This actually adds the headers that were collected with addHeader.
void finalizeRequest(std::string const& url, AIHTTPTimeoutPolicy const& policy, AICurlEasyRequestStateMachine* state_machine);
// Called by MultiHandle::add_easy_request when the easy handle is actually being added to the multi handle.
void timeout_add_easy_request(void);
//-------------------------------------------
// Timeout administration events:
// Called when data was written the first time, meaning that the connection succeeded.
void timeout_connected(void);
// Called by MultiHandle::add_easy_request when the easy handle is actually being added to the multi handle.
void timeout_add_easy_request(void);
// Called when data is sent.
void timeout_data_sent(size_t n);
// Called after sending all headers, when body data is written the first time.
void timeout_connected(void);
// Called when data is received.
void timeout_data_received(size_t n);
// Called when everything we had to send to the server has been sent.
void timeout_upload_finished(void);
// Called immediately before done() after curl finished, with code.
void timeout_done(CURLcode code);
// Called when data is sent. Returns true if transfer timed out.
bool timeout_data_sent(size_t n);
// Progress meter callback.
static int timeout_progress(void* userdata, double dltotal, double dlnow, double ultotal, double ulnow);
int timeout_progress(double dlnow, double ulnow);
// Called when data is received. Returns true if transfer timed out.
bool timeout_data_received(size_t n);
// Called by MultiHandle::check_run_count() to store result code that is returned by getResult.
void store_result(CURLcode result) { mResult = result; }
// Called immediately before done() after curl finished, with code.
void timeout_done(CURLcode code);
// Called by MultiHandle::check_run_count() when the curl easy handle is done.
void done(AICurlEasyRequest_wat& curl_easy_request_w) { finished(curl_easy_request_w); }
// Accessor.
bool timeout_has_stalled(void) const { return mTimeoutStalled < sTimeoutClockCount; }
// Called by MultiHandle::check_run_count() to fill info with the transfer info.
void getTransferInfo(AICurlInterface::TransferInfo* info);
private:
// (Re)start low speed transer rate detection.
void timeout_reset_lowspeed(void);
// If result != CURLE_FAILED_INIT then also info was filled.
void getResult(CURLcode* result, AICurlInterface::TransferInfo* info = NULL);
// Common low speed detection, Called from timeout_data_sent or timeout_data_received.
bool timeout_lowspeed(size_t bytes);
private:
curl_slist* mHeaders;
bool mRequestFinalized;
AICurlEasyHandleEvents* mEventsTarget;
CURLcode mResult;
// End of timeout stuff
//-------------------------------------------
// AIFIXME: put all timeout stuff in it's own class.
AIHTTPTimeoutPolicy const* mTimeoutPolicy;
std::string mTimeoutLowercaseHostname; // Lowercase hostname (canonicalized) extracted from the url.
bool mTimeoutConnected; // Set if we succeeded to connect and are transfering data.
#ifdef CWDEBUG
U64 mTimeout_connect_time; // Time at which mTimeoutConnected was set to true (for debugging purposes only).
#endif
bool mTimeoutWaitingForReply; // Set after we finished sending data to the server and are waiting for the reply.
bool mTimeoutNothingReceivedYet; // Set when connected, reset when the HTML reply header from the server is received.
U64 mTimeout_progress_time; // The (last) time timeout_progress() was called, in microseconds.
U64 mTransferOK; // The last time the transfer rate was OK.
double mTimeout_dlnow; // Number of downloaded bytes so far, as per last call to timeout_progress.
double mTimeout_ulnow; // Number of uploaded bytes so far, as per last call to timeout_progress.
public:
// Called by MultiHandle::check_run_count() to store result code that is returned by getResult.
void store_result(CURLcode result) { mResult = result; }
private:
// This class may only be created by constructing a ThreadSafeCurlEasyRequest.
friend class ThreadSafeCurlEasyRequest;
// Throws AICurlNoEasyHandle.
// Called by MultiHandle::check_run_count() when the curl easy handle is done.
void done(AICurlEasyRequest_wat& curl_easy_request_w) { finished(curl_easy_request_w); }
// Called by MultiHandle::check_run_count() to fill info with the transfer info.
void getTransferInfo(AICurlInterface::TransferInfo* info);
// If result != CURLE_FAILED_INIT then also info was filled.
void getResult(CURLcode* result, AICurlInterface::TransferInfo* info = NULL);
private:
curl_slist* mHeaders;
bool mRequestFinalized;
AICurlEasyHandleEvents* mEventsTarget;
CURLcode mResult;
// AIFIXME: put all timeout stuff in it's own class.
AIHTTPTimeoutPolicy const* mTimeoutPolicy;
std::string mTimeoutLowercaseHostname; // Lowercase hostname (canonicalized) extracted from the url.
std::vector<U32> mTimeoutBuckets; // An array with the number of bytes transfered in each second.
U16 mTimeoutBucket; // The bucket corresponding to mTimeoutLastSecond.
bool mTimeoutNothingReceivedYet; // Set when connected, reset when the HTML reply header from the server is received.
bool mTimeoutLowSpeedOn; // Set while uploading or downloading data.
S32 mTimeoutLastSecond; // The time at which timeout_lowspeed was last called, in seconds since mTimeoutLowSpeedClock.
U32 mTimeoutTotalBytes; // The sum of all bytes in mTimeoutBuckets.
U64 mTimeoutLowSpeedClock; // Clock count at which low speed transfer rate started.
U64 mTimeoutStalled; // The clock count at which this transaction is considered to be stalling.
public:
static F64 const sTimeoutClockWidth; // Time between two clock ticks in seconds.
static U64 sTimeoutClockCount; // Clock count used during one loop of the main loop.
private:
// This class may only be created by constructing a ThreadSafeCurlEasyRequest.
friend class ThreadSafeCurlEasyRequest;
// Throws AICurlNoEasyHandle.
CurlEasyRequest(void) :
mHeaders(NULL), mRequestFinalized(false), mEventsTarget(NULL), mResult(CURLE_FAILED_INIT), mTimeoutPolicy(NULL)
{ applyDefaultOptions(); }

View File

@@ -30,7 +30,7 @@
#include "linden_common.h"
#include "aicurlthread.h"
#include "lltimer.h" // ms_sleep
#include "lltimer.h" // ms_sleep, get_clock_count
#include <sys/types.h>
#if !LL_WINDOWS
#include <sys/select.h>
@@ -699,7 +699,7 @@ bool MergeIterator::next(curl_socket_t& fd_out, int& ev_bitmask_out)
class CurlSocketInfo
{
public:
CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action);
CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeCurlEasyRequest* lockobj);
~CurlSocketInfo();
void set_action(int action);
@@ -709,11 +709,13 @@ class CurlSocketInfo
CURL const* mEasy;
curl_socket_t mSocketFd;
int mAction;
AICurlEasyRequest mEasyRequest;
};
CurlSocketInfo::CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action) :
mMultiHandle(multi_handle), mEasy(easy), mSocketFd(s), mAction(CURL_POLL_NONE)
CurlSocketInfo::CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeCurlEasyRequest* lockobj) :
mMultiHandle(multi_handle), mEasy(easy), mSocketFd(s), mAction(CURL_POLL_NONE), mEasyRequest(lockobj)
{
llassert(*AICurlEasyRequest_wat(*mEasyRequest) == easy);
mMultiHandle.assign(s, this);
llassert(!mMultiHandle.mReadPollSet->contains(s));
llassert(!mMultiHandle.mWritePollSet->contains(s));
@@ -741,7 +743,12 @@ void CurlSocketInfo::set_action(int action)
if ((action & CURL_POLL_OUT))
mMultiHandle.mWritePollSet->add(mSocketFd);
else
{
mMultiHandle.mWritePollSet->remove(mSocketFd);
// If CURL_POLL_OUT is removed, we have nothing more to send apparently.
AICurlEasyRequest_wat curl_easy_request_w(*mEasyRequest);
curl_easy_request_w->timeout_upload_finished(); // Update timeout administration.
}
}
}
@@ -1302,9 +1309,12 @@ void AICurlThread::run(void)
llwarns << "select() failed: " << errno << ", " << strerror(errno) << llendl;
continue;
}
else if (ready == 0)
// Clock count used for timeouts.
CurlEasyRequest::sTimeoutClockCount = get_clock_count();
if (ready == 0)
{
multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0);
multi_handle_w->handle_stalls();
}
else
{
@@ -1362,6 +1372,18 @@ MultiHandle::~MultiHandle()
delete mReadPollSet;
}
void MultiHandle::handle_stalls(void)
{
for(addedEasyRequests_type::iterator iter = mAddedEasyRequests.begin(); iter != mAddedEasyRequests.end(); iter = mAddedEasyRequests.begin())
{
if (AICurlEasyRequest_wat(**iter)->timeout_has_stalled())
{
Dout(dc::curl, "MultiHandle::handle_stalls(): Easy request stalled! [" << (void*)iter->get_ptr().get() << "]");
remove_easy_request(*iter, false);
}
}
}
#if defined(CWDEBUG) || defined(DEBUG_CURLIO)
#undef AI_CASE_RETURN
#define AI_CASE_RETURN(x) do { case x: return #x; } while(0)
@@ -1398,7 +1420,10 @@ int MultiHandle::socket_callback(CURL* easy, curl_socket_t s, int action, void*
{
if (!sock_info)
{
sock_info = new CurlSocketInfo(self, easy, s, action);
ThreadSafeCurlEasyRequest* ptr;
CURLcode rese = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr);
llassert_always(rese == CURLE_OK);
sock_info = new CurlSocketInfo(self, easy, s, action, ptr);
}
else
{

View File

@@ -95,6 +95,9 @@ class MultiHandle : public CurlMultiHandle
// This is called before sleeping, after calling (one or more times) socket_action.
void check_run_count(void);
// Called from the main loop every time select() timed out.
void handle_stalls(void);
public:
//-----------------------------------------------------------------------------
// Curl socket administration: