From 125a10bb447b8ab693d6c526b296ea95467e1ab9 Mon Sep 17 00:00:00 2001 From: Aleric Inglewood Date: Wed, 4 Jul 2012 00:10:43 +0200 Subject: [PATCH] Code hardening, review, bug fixes, documentation, curl stats and cleanup. Bug fixes: AICurlEasyRequestStateMachine didn't delete itself. curl_multi_socket_action calls were made for potentional removed sockets. The curl thread wasn't terminated. --- indra/llmessage/aicurl.cpp | 107 ++++--- indra/llmessage/aicurlprivate.h | 42 ++- indra/llmessage/aicurlthread.cpp | 276 ++++++++++++------ indra/llmessage/aicurlthread.h | 2 +- indra/newview/llxmlrpctransaction.cpp | 14 +- .../aicurleasyrequeststatemachine.cpp | 2 + 6 files changed, 300 insertions(+), 143 deletions(-) diff --git a/indra/llmessage/aicurl.cpp b/indra/llmessage/aicurl.cpp index 5a012c0e7..a2ea765f1 100644 --- a/indra/llmessage/aicurl.cpp +++ b/indra/llmessage/aicurl.cpp @@ -259,10 +259,9 @@ static unsigned int encoded_version(int major, int minor, int patch) // External API // -namespace AICurlInterface { - #undef AICurlPrivate -using AICurlPrivate::check_easy_code; + +namespace AICurlInterface { // MAIN-THREAD void initCurl(F32 curl_request_timeout, S32 max_number_handles) @@ -270,7 +269,7 @@ void initCurl(F32 curl_request_timeout, S32 max_number_handles) DoutEntering(dc::curl, "AICurlInterface::initCurl(" << curl_request_timeout << ", " << max_number_handles << ")"); llassert(LLThread::getRunning() == 0); // We must not call curl_global_init unless we are the only thread. - CURLcode res = check_easy_code(curl_global_init(CURL_GLOBAL_ALL)); + CURLcode res = curl_global_init(CURL_GLOBAL_ALL); if (res != CURLE_OK) { llerrs << "curl_global_init(CURL_GLOBAL_ALL) failed." << llendl; @@ -346,10 +345,12 @@ void initCurl(F32 curl_request_timeout, S32 max_number_handles) // MAIN-THREAD void cleanupCurl(void) { + using AICurlPrivate::stopCurlThread; using AICurlPrivate::curlThreadIsRunning; DoutEntering(dc::curl, "AICurlInterface::cleanupCurl()"); + stopCurlThread(); ssl_cleanup(); llassert(LLThread::getRunning() <= (curlThreadIsRunning() ? 1 : 0)); // We must not call curl_global_cleanup unless we are the only thread left. @@ -495,30 +496,32 @@ void intrusive_ptr_release(Responder* responder) namespace AICurlPrivate { -// THREAD-SAFE -CURLcode check_easy_code(CURLcode code) +//static +LLAtomicU32 Stats::easy_calls; +LLAtomicU32 Stats::easy_errors; +LLAtomicU32 Stats::easy_init_calls; +LLAtomicU32 Stats::easy_init_errors; +LLAtomicU32 Stats::easy_cleanup_calls; +LLAtomicU32 Stats::multi_calls; +LLAtomicU32 Stats::multi_errors; + +//static +void Stats::print(void) { - if (code != CURLE_OK) - { - char* error_buffer = LLThreadLocalData::tldata().mCurlErrorBuffer; - llinfos << "curl easy error detected: " << curl_easy_strerror(code); - if (error_buffer && *error_buffer != '\0') - { - llcont << ": " << error_buffer; - } - llcont << llendl; - } - return code; + llinfos << "====== CURL STATS ======" << llendl; + llinfos << " Curl multi errors/calls: " << std::dec << multi_errors << "/" << multi_calls << llendl; + llinfos << " Curl easy errors/calls: " << std::dec << easy_errors << "/" << easy_calls << llendl; + llinfos << " curl_easy_init() errors/calls: " << std::dec << easy_init_errors << "/" << easy_init_calls << llendl; + llinfos << " Current number of curl easy handles: " << std::dec << (easy_init_calls - easy_init_errors - easy_cleanup_calls) << llendl; + llinfos << "=== END OF CURL STATS ===" << llendl; } // THREAD-SAFE -CURLMcode check_multi_code(CURLMcode code) +void handle_multi_error(CURLMcode code) { - if (code != CURLM_OK) - { - llinfos << "curl multi error detected: " << curl_multi_strerror(code) << llendl; - } - return code; + Stats::multi_errors++; + llinfos << "curl multi error detected: " << curl_multi_strerror(code) << + "; (errors/calls = " << Stats::multi_errors << "/" << Stats::multi_calls << ")" << llendl; } //============================================================================= @@ -528,33 +531,46 @@ CURLMcode check_multi_code(CURLMcode code) //----------------------------------------------------------------------------- // CurlEasyHandle -LLAtomicU32 CurlEasyHandle::sTotalEasyHandles; +// THREAD-SAFE +//static +void CurlEasyHandle::handle_easy_error(CURLcode code) +{ + char* error_buffer = LLThreadLocalData::tldata().mCurlErrorBuffer; + llinfos << "curl easy error detected: " << curl_easy_strerror(code); + if (error_buffer && *error_buffer != '\0') + { + llcont << ": " << error_buffer; + } + Stats::easy_errors++; + llcont << "; (errors/calls = " << Stats::easy_errors << "/" << Stats::easy_calls << ")" << llendl; +} // Throws AICurlNoEasyHandle. CurlEasyHandle::CurlEasyHandle(void) : mActiveMultiHandle(NULL), mErrorBuffer(NULL) { mEasyHandle = curl_easy_init(); #if 0 - //FIXME: for debugging, throw once every 10 times. - static int c = 0; - if (++c % 10 == 5) + // Fake curl_easy_init() failures: throw once every 10 times (for debugging purposes). + static int count = 0; + if (mEasyHandle && (++count % 10) == 5) { curl_easy_cleanup(mEasyHandle); mEasyHandle = NULL; } #endif + Stats::easy_init_calls++; if (!mEasyHandle) { + Stats::easy_init_errors++; throw AICurlNoEasyHandle("curl_easy_init() returned NULL"); } - sTotalEasyHandles++; } CurlEasyHandle::~CurlEasyHandle() { llassert(!mActiveMultiHandle); curl_easy_cleanup(mEasyHandle); - --sTotalEasyHandles; + Stats::easy_cleanup_calls++; } //static @@ -573,8 +589,13 @@ void CurlEasyHandle::setErrorBuffer(void) char* error_buffer = getTLErrorBuffer(); if (mErrorBuffer != error_buffer) { - curl_easy_setopt(mEasyHandle, CURLOPT_ERRORBUFFER, error_buffer); mErrorBuffer = error_buffer; + CURLcode res = curl_easy_setopt(mEasyHandle, CURLOPT_ERRORBUFFER, error_buffer); + if (res != CURLE_OK) + { + llwarns << "curl_easy_setopt(" << (void*)mEasyHandle << "CURLOPT_ERRORBUFFER, " << (void*)error_buffer << ") failed with error " << res << llendl; + mErrorBuffer = NULL; + } } } @@ -727,27 +748,29 @@ void CurlEasyRequest::setSSLCtxCallback(curl_ssl_ctx_callback callback, void* us setopt(CURLOPT_SSL_CTX_DATA, userdata ? this : NULL); } +#define llmaybewarns lllog(LLApp::isExiting() ? LLError::LEVEL_INFO : LLError::LEVEL_WARN, NULL, NULL, false) + static size_t noHeaderCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { - llwarns << "Calling noHeaderCallback(); curl session aborted." << llendl; + llmaybewarns << "Calling noHeaderCallback(); curl session aborted." << llendl; return 0; // Cause a CURL_WRITE_ERROR } static size_t noWriteCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { - llwarns << "Calling noWriteCallback(); curl session aborted." << llendl; + llmaybewarns << "Calling noWriteCallback(); curl session aborted." << llendl; return 0; // Cause a CURL_WRITE_ERROR } static size_t noReadCallback(char* ptr, size_t size, size_t nmemb, void* userdata) { - llwarns << "Calling noReadCallback(); curl session aborted." << llendl; + llmaybewarns << "Calling noReadCallback(); curl session aborted." << llendl; return CURL_READFUNC_ABORT; // Cause a CURLE_ABORTED_BY_CALLBACK } static CURLcode noSSLCtxCallback(CURL* curl, void* sslctx, void* parm) { - llwarns << "Calling noSSLCtxCallback(); curl session aborted." << llendl; + llmaybewarns << "Calling noSSLCtxCallback(); curl session aborted." << llendl; return CURLE_ABORTED_BY_CALLBACK; } @@ -765,7 +788,7 @@ void CurlEasyRequest::revokeCallbacks(void) mWriteCallback = &noWriteCallback; mReadCallback = &noReadCallback; mSSLCtxCallback = &noSSLCtxCallback; - if (active()) + if (active() && !LLApp::isExiting()) { llwarns << "Revoking callbacks on a still active CurlEasyRequest object!" << llendl; } @@ -907,14 +930,17 @@ void CurlEasyRequest::applyDefaultOptions(void) //setopt(CURLOPT_DNS_CACHE_TIMEOUT, 0); // Set the CURL options for either SOCKS or HTTP proxy. applyProxySettings(); +#if 0 + // Cause libcurl to print all it's I/O traffic on the debug channel. Debug( if (dc::curl.is_on()) { - setopt(CURLOPT_VERBOSE, 1); // Useful for debugging. + setopt(CURLOPT_VERBOSE, 1); setopt(CURLOPT_DEBUGFUNCTION, &curl_debug_callback); setopt(CURLOPT_DEBUGDATA, this); } ); +#endif } void CurlEasyRequest::finalizeRequest(std::string const& url) @@ -1204,9 +1230,12 @@ LLAtomicU32 CurlMultiHandle::sTotalMultiHandles; CurlMultiHandle::CurlMultiHandle(void) { + DoutEntering(dc::curl, "CurlMultiHandle::CurlMultiHandle() [" << (void*)this << "]."); mMultiHandle = curl_multi_init(); + Stats::multi_calls++; if (!mMultiHandle) { + Stats::multi_errors++; throw AICurlNoMultiHandle("curl_multi_init() returned NULL"); } sTotalMultiHandles++; @@ -1215,7 +1244,11 @@ CurlMultiHandle::CurlMultiHandle(void) CurlMultiHandle::~CurlMultiHandle() { curl_multi_cleanup(mMultiHandle); - --sTotalMultiHandles; + Stats::multi_calls++; + int total = --sTotalMultiHandles; + Dout(dc::curl, "Called CurlMultiHandle::~CurlMultiHandle() [" << (void*)this << "], " << total << " remaining."); + if (total == 0) + Stats::print(); } } // namespace AICurlPrivate diff --git a/indra/llmessage/aicurlprivate.h b/indra/llmessage/aicurlprivate.h index c5843176d..82d4e9c44 100644 --- a/indra/llmessage/aicurlprivate.h +++ b/indra/llmessage/aicurlprivate.h @@ -32,15 +32,29 @@ #define AICURLPRIVATE_H #include +#include "llatomic.h" namespace AICurlPrivate { namespace curlthread { class MultiHandle; } -CURLcode check_easy_code(CURLcode code); -CURLMcode check_multi_code(CURLMcode code); +struct Stats { + static LLAtomicU32 easy_calls; + static LLAtomicU32 easy_errors; + static LLAtomicU32 easy_init_calls; + static LLAtomicU32 easy_init_errors; + static LLAtomicU32 easy_cleanup_calls; + static LLAtomicU32 multi_calls; + static LLAtomicU32 multi_errors; + + static void print(void); +}; + +void handle_multi_error(CURLMcode code); +inline CURLMcode check_multi_code(CURLMcode code) { Stats::multi_calls++; if (code != CURLM_OK) handle_multi_error(code); return code; } bool curlThreadIsRunning(void); void wakeUpCurlThread(void); +void stopCurlThread(void); class ThreadSafeCurlEasyRequest; class ThreadSafeBufferedCurlEasyRequest; @@ -88,7 +102,6 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven CURL* mEasyHandle; CURLM* mActiveMultiHandle; char* mErrorBuffer; - static LLAtomicU32 sTotalEasyHandles; private: // This should only be called from MultiHandle; add/remove an easy handle to/from a multi handle. @@ -97,22 +110,31 @@ class CurlEasyHandle : public boost::noncopyable, protected AICurlEasyHandleEven CURLMcode remove_handle_from_multi(AICurlEasyRequest_wat& curl_easy_request_w, CURLM* multi_handle); public: - // Returns total number of existing CURL* handles (excluding ones created outside this class). - static U32 getTotalEasyHandles(void) { return sTotalEasyHandles; } - // Returns true if this easy handle was added to a curl multi handle. bool active(void) const { return mActiveMultiHandle; } - // Call this prior to every curl_easy function whose return value is passed to check_easy_code. - void setErrorBuffer(void); - // If there was an error code as result, then this returns a human readable error string. // Only valid when setErrorBuffer was called and the curl_easy function returned an error. - std::string getErrorString(void) const { return mErrorBuffer; } + std::string getErrorString(void) const { return mErrorBuffer ? mErrorBuffer : "(null)"; } // Used for debugging purposes. bool operator==(CURL* easy_handle) const { return mEasyHandle == easy_handle; } + private: + // Call this prior to every curl_easy function whose return value is passed to check_easy_code. + void setErrorBuffer(void); + + static void handle_easy_error(CURLcode code); + + // Always first call setErrorBuffer()! + static inline CURLcode check_easy_code(CURLcode code) + { + Stats::easy_calls++; + if (code != CURLE_OK) + handle_easy_error(code); + return code; + } + protected: // Return the underlying curl easy handle. CURL* getEasyHandle(void) const { return mEasyHandle; } diff --git a/indra/llmessage/aicurlthread.cpp b/indra/llmessage/aicurlthread.cpp index 7e51ee5b5..005e4c620 100644 --- a/indra/llmessage/aicurlthread.cpp +++ b/indra/llmessage/aicurlthread.cpp @@ -132,8 +132,10 @@ namespace curlthread { // // mMaxFdSet is the largest filedescriptor in mFdSet or -1 if it is empty. +static size_t const MAXSIZE = std::max(1024, FD_SETSIZE); + // Create an empty PollSet. -PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [std::max(1024, FD_SETSIZE)]), mSize(std::max(1024, FD_SETSIZE)), +PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [MAXSIZE]), mNrFds(0), mMaxFd(-1), mNext(0), mMaxFdSet(-1) { FD_ZERO(&mFdSet); @@ -142,7 +144,7 @@ PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [std::max(1024, FD_S // Add filedescriptor s to the PollSet. void PollSet::add(curl_socket_t s) { - llassert_always(mNrFds < mSize); + llassert_always(mNrFds < MAXSIZE); mFileDescriptors[mNrFds++] = s; mMaxFd = std::max(mMaxFd, s); } @@ -159,17 +161,57 @@ void PollSet::remove(curl_socket_t s) // much clock cycles. Therefore, shift the whole vector // back, keeping it compact and keeping the filedescriptors // in the same order (which is supposedly their priority). + // + // The general case is where mFileDescriptors contains s at an index + // between 0 and mNrFds: + // mNrFds = 6 + // v + // index: 0 1 2 3 4 5 + // a b c s d e + + // This function should never be called unless s is actually in mFileDescriptors, + // as a result of a previous call to PollSet::add(). llassert(mNrFds > 0); + + // Correct mNrFds for when the descriptor is removed. + // Make i 'point' to the last entry. int i = --mNrFds; - int prev = mFileDescriptors[i]; + // i = NrFds = 5 + // v + // index: 0 1 2 3 4 5 + // a b c s d e + int prev = mFileDescriptors[i]; // prev = 'e' int max = -1; for (--i; i >= 0 && prev != s; --i) { - int cur = mFileDescriptors[i]; - mFileDescriptors[i] = prev; - max = std::max(max, prev); - prev = cur; + int cur = mFileDescriptors[i]; // cur = 'd' + mFileDescriptors[i] = prev; // Overwrite 'd' with 'e'. + max = std::max(max, prev); // max is the maximum value in 'i' or higher. + prev = cur; // prev = 'd' + // i NrFds = 5 + // v v + // index: 0 1 2 3 4 + // a b c s e // prev = 'd' + // + // Next loop iteration: cur = 's', overwrite 's' with 'd', prev = 's'; loop terminates. + // i NrFds = 5 + // v v + // index: 0 1 2 3 4 + // a b c d e // prev = 's' } + llassert(prev == s); + // At this point i was decremented once more and points to the element before the old s. + // i NrFds = 5 + // v v + // index: 0 1 2 3 4 + // a b c d e // max = std::max('d', 'e') + + // If mNext pointed to an element before s, it should be left alone. Otherwise, if mNext pointed + // to s it must now point to 'd', or if it pointed beyond 's' it must be decremented by 1. + if (mNext > i + 1) // i + 1 is where s was. + --mNext; + + // If s was the largest file descriptor, we have to update mMaxFd. if (s == mMaxFd) { while (i >= 0) @@ -182,8 +224,10 @@ void PollSet::remove(curl_socket_t s) llassert(mMaxFd < s); llassert((mMaxFd == -1) == (mNrFds == 0)); } - if (mNext == mNrFds) - mNext = 0; + + // ALSO make sure that s is no longer set in mFdSet, or we might confuse libcurl by + // calling curl_multi_socket_action for a socket that it told us to remove. + clr(s); } bool PollSet::contains(curl_socket_t fd) const @@ -220,7 +264,7 @@ refresh_t PollSet::refresh(void) llassert_always(mNext < mNrFds); - // Test if mNrFds is larger than or equal FD_SETSIZE; equal, because we reserve one + // Test if mNrFds is larger than or equal to FD_SETSIZE; equal, because we reserve one // filedescriptor for the wakeup fd: we copy maximal FD_SETSIZE - 1 filedescriptors. // If not then we're going to copy everything so that we can save on CPU cycles // by not calculating mMaxFdSet here. @@ -228,22 +272,9 @@ refresh_t PollSet::refresh(void) { llwarns << "PollSet::reset: More than FD_SETSIZE (" << FD_SETSIZE << ") file descriptors active!" << llendl; // Calculate mMaxFdSet. - int max = -1; - int count = 0; - int end = mNrFds; - int i = mNext; - while (++count < FD_SETSIZE) - { - max = std::max(max, mFileDescriptors[i]); - if (++i == end) - { - if (end == mNext) - break; - end = mNext; - i = 0; - llassert(i < end); // If mNext == 0 then the while loop terminates before we get here. - } - } + // Run over FD_SETSIZE - 1 elements, starting at mNext, wrapping to 0 when we reach the end. + int max = -1, i = mNext, count = 0; + while (++count < FD_SETSIZE) { max = std::max(max, mFileDescriptors[i]); if (++i == mNrFds) i = 0; } mMaxFdSet = max; } else @@ -252,7 +283,6 @@ refresh_t PollSet::refresh(void) mMaxFdSet = mMaxFd; } int count = 0; - int end = mNrFds; int i = mNext; for(;;) { @@ -263,15 +293,13 @@ refresh_t PollSet::refresh(void) } FD_SET(mFileDescriptors[i], &mFdSet); mCopiedFileDescriptors.push_back(mFileDescriptors[i]); - if (++i == end) + if (++i == mNrFds) { + // If we reached the end and start at the beginning, then we copied everything. if (mNext == 0) break; - // If this was true then we got here a second time, which means that we accessed all - // filedescriptors with mNext != 0, but count is still < FD_SETSIZE, which is not - // possible because mNext is only non-zero when mNrFds >= FD_SETSIZE. - llassert(end != mNext); - end = mNext; + // When can only come here if mNrFds >= FD_SETSIZE, hence we can just + // wrap around and terminate on count reaching FD_SETSIZE. i = 0; } } @@ -280,9 +308,29 @@ refresh_t PollSet::refresh(void) // FIXME: This needs a rewrite on Windows, as FD_ISSET is slow there; it would make // more sense to iterate directly over the fd's in mFdSet on Windows. +// +// The API reset(), get() and next() allows one to run over all filedescriptors +// in mFdSet that are set. This works by running only over the filedescriptors +// that were set initially (by the call to refresh()) and then checking if that +// filedescriptor is (still) set in mFdSet. +// +// A call to reset() resets mIter to the beginning, so that get() returns +// the first filedescriptor that is still set. A call to next() progresses +// the iterator to the next set filedescriptor. If get() return -1, then there +// were no more filedescriptors set. +// +// Note that one should never call next() unless get() didn't return -1, so +// the call sequence is: +// refresh(); +// /* reset some or all bits in mFdSet */ +// reset(); +// while (get() != -1) // next(); +// +// Note also that this API is only used by MergeIterator, which wraps it +// and provides a different API to use. + void PollSet::reset(void) { - llassert((mNrFds == 0) == mCopiedFileDescriptors.empty()); if (mCopiedFileDescriptors.empty()) mIter = mCopiedFileDescriptors.end(); else @@ -298,8 +346,6 @@ inline int PollSet::get(void) const return (mIter == mCopiedFileDescriptors.end()) ? -1 : *mIter; } -// FIXME: This needs a rewrite on Windows, as FD_ISSET is slow there; it would make -// more sense to iterate directly over the fd's in mFdSet on Windows. void PollSet::next(void) { llassert(mIter != mCopiedFileDescriptors.end()); // Only call next() if the last call to get() didn't return -1. @@ -308,6 +354,10 @@ void PollSet::next(void) //----------------------------------------------------------------------------- // MergeIterator +// +// This class takes two PollSet's and allows one to run over all filedescriptors +// that are set in one or both poll sets, returning each filedescriptor only +// once, by calling next() until it returns false. class MergeIterator { @@ -343,7 +393,6 @@ bool MergeIterator::next(int& fd_out, int& ev_bitmask_out) fd_out = rfd; ev_bitmask_out = CURL_CSELECT_IN | CURL_CSELECT_OUT; mReadPollSet.next(); - } else if ((unsigned int)rfd < (unsigned int)wfd) // Use and increment smaller one, unless it's -1. { @@ -440,6 +489,9 @@ class AICurlThread : public LLThread // MAIN-THREAD void wakeup_thread(void); + // MAIN-THREAD + void stop_thread(void) { mRunning = false; wakeup_thread(); } + protected: virtual void run(void); void wakeup(AICurlMultiHandle_wat const& multi_handle_w); @@ -453,13 +505,15 @@ class AICurlThread : public LLThread int mWakeUpFd; int mZeroTimeOut; + + volatile bool mRunning; }; // Only the main thread is accessing this. AICurlThread* AICurlThread::sInstance = NULL; // MAIN-THREAD -AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(-1), mWakeUpFd(-1), mZeroTimeOut(0) +AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(-1), mWakeUpFd(-1), mZeroTimeOut(0), mRunning(true) { create_wakeup_fds(); sInstance = this; @@ -567,7 +621,7 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w) llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl; close(mWakeUpFd); mWakeUpFd = -1; - shutdown(); + mRunning = false; return; } #endif @@ -610,30 +664,29 @@ void AICurlThread::run(void) { DoutEntering(dc::curl, "AICurlThread::run()"); - AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance()); - for(;;) { - refresh_t rres = multi_handle_w->mReadPollSet.refresh(); - refresh_t wres = multi_handle_w->mWritePollSet.refresh(); - fd_set* read_fd_set = multi_handle_w->mReadPollSet.access(); - if (LL_LIKELY(mWakeUpFd != -1)) - FD_SET(mWakeUpFd, read_fd_set); - else if ((rres & empty)) - read_fd_set = NULL; - fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access(); - int const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd); - int const max_wfd = multi_handle_w->mWritePollSet.get_max_fd(); - int nfds = std::max(max_rfd, max_wfd) + 1; - llassert(0 <= nfds && nfds <= FD_SETSIZE); - llassert((max_rfd == -1) == (read_fd_set == NULL) && - (max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows. - llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) && - (max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd))); - int ready = 0; - if (LL_UNLIKELY(nfds == 0)) // Only happens when the thread is shutting down. - ms_sleep(1000); - else + AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance()); + while(mRunning) { + // If mRunning is true then we can only get here if mWakeUpFd != -1. + llassert(mWakeUpFd != -1); + // Copy the next batch of file descriptors from the PollSets mFiledescriptors into their mFdSet. + multi_handle_w->mReadPollSet.refresh(); + refresh_t wres = multi_handle_w->mWritePollSet.refresh(); + // Add wake up fd if any, and pass NULL to select() if a set is empty. + fd_set* read_fd_set = multi_handle_w->mReadPollSet.access(); + FD_SET(mWakeUpFd, read_fd_set); + fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access(); + // Calculate nfds (ignored on windows). + int const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd); + int const max_wfd = multi_handle_w->mWritePollSet.get_max_fd(); + int nfds = std::max(max_rfd, max_wfd) + 1; + llassert(0 <= nfds && nfds <= FD_SETSIZE); + llassert((max_rfd == -1) == (read_fd_set == NULL) && + (max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows. + llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) && + (max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd))); + int ready = 0; struct timeval timeout; long timeout_ms = multi_handle_w->getTimeOut(); // If no timeout is set, sleep 1 second. @@ -695,37 +748,41 @@ void AICurlThread::run(void) if (ready == -1) last_errno = errno; #endif - } - // Select returns the total number of bits set in each of the fd_set's (upon return), - // or -1 when an error occurred. A value of 0 means that a timeout occurred. - if (ready == -1) - { - llwarns << "select() failed: " << errno << ", " << strerror(errno) << llendl; - continue; - } - else if (ready == 0) - { - multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0); - } - else - { - if (multi_handle_w->mReadPollSet.is_set(mWakeUpFd)) + // Select returns the total number of bits set in each of the fd_set's (upon return), + // or -1 when an error occurred. A value of 0 means that a timeout occurred. + if (ready == -1) { - wakeup(multi_handle_w); - --ready; + llwarns << "select() failed: " << errno << ", " << strerror(errno) << llendl; + continue; } - MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet); - int fd, ev_bitmask; - while (ready > 0 && iter.next(fd, ev_bitmask)) + else if (ready == 0) { - ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1; - multi_handle_w->socket_action(fd, ev_bitmask); - llassert(ready >= 0); + multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0); } - llassert(ready == 0); + else + { + if (multi_handle_w->mReadPollSet.is_set(mWakeUpFd)) + { + // Process commands from main-thread. This can add or remove filedescriptors from the poll sets. + wakeup(multi_handle_w); + --ready; + } + // Handle all active filedescriptors. + MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet); + int fd, ev_bitmask; + while (ready > 0 && iter.next(fd, ev_bitmask)) + { + ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1; + multi_handle_w->socket_action(fd, ev_bitmask); + llassert(ready >= 0); + } + // Should have handled them all. + llassert(ready == 0); + } + multi_handle_w->check_run_count(); } - multi_handle_w->check_run_count(); } + AICurlMultiHandle::destroyInstance(); } //----------------------------------------------------------------------------- @@ -733,10 +790,10 @@ void AICurlThread::run(void) MultiHandle::MultiHandle(void) : mHandleAddedOrRemoved(false), mPrevRunningHandles(0), mRunningHandles(0), mTimeOut(-1) { - curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETFUNCTION, &MultiHandle::socket_callback); - curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETDATA, this); - curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERFUNCTION, &MultiHandle::timer_callback); - curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERDATA, this); + check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETFUNCTION, &MultiHandle::socket_callback)); + check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETDATA, this)); + check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERFUNCTION, &MultiHandle::timer_callback)); + check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERDATA, this)); } MultiHandle::~MultiHandle() @@ -818,7 +875,13 @@ CURLMcode MultiHandle::assign(curl_socket_t sockfd, void* sockptr) CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const { - return curl_multi_info_read(mMultiHandle, msgs_in_queue); + CURLMsg const* ret = curl_multi_info_read(mMultiHandle, msgs_in_queue); + // NULL could be an error, but normally it isn't, so don't print anything and + // never increment Stats::multi_errors. However, lets just increment multi_calls + // when it certainly wasn't an error... + if (ret) + Stats::multi_calls++; + return ret; } CURLMcode MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request) @@ -863,7 +926,8 @@ void MultiHandle::check_run_count(void) { CURL* easy = msg->easy_handle; ThreadSafeCurlEasyRequest* ptr; - curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr); + CURLcode rese = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr); + llassert_always(rese == CURLE_OK); AICurlEasyRequest easy_request(ptr); llassert(*AICurlEasyRequest_wat(*easy_request) == easy); // Store the result and transfer info in the easy handle. @@ -904,6 +968,15 @@ void MultiHandle::check_run_count(void) } // namespace curlthread } // namespace AICurlPrivate +//static +void AICurlMultiHandle::destroyInstance(void) +{ + LLThreadLocalData& tldata = LLThreadLocalData::tldata(); + Dout(dc::curl, "Destroying AICurlMultiHandle [" << (void*)tldata.mCurlMultiHandle << "] for thread \"" << tldata.mName << "\"."); + delete tldata.mCurlMultiHandle; + tldata.mCurlMultiHandle = NULL; +} + //============================================================================= // MAIN-THREAD (needing to access the above declarations). @@ -913,8 +986,8 @@ AICurlMultiHandle& AICurlMultiHandle::getInstance(void) LLThreadLocalData& tldata = LLThreadLocalData::tldata(); if (!tldata.mCurlMultiHandle) { - llinfos << "Creating AICurlMultiHandle for thread \"" << tldata.mName << "\"." << llendl; tldata.mCurlMultiHandle = new AICurlMultiHandle; + Dout(dc::curl, "Created AICurlMultiHandle [" << (void*)tldata.mCurlMultiHandle << "] for thread \"" << tldata.mName << "\"."); } return *static_cast(tldata.mCurlMultiHandle); } @@ -934,6 +1007,21 @@ void wakeUpCurlThread(void) AICurlThread::sInstance->wakeup_thread(); } +void stopCurlThread(void) +{ + using curlthread::AICurlThread; + if (AICurlThread::sInstance) + { + AICurlThread::sInstance->stop_thread(); + int count = 101; + while(--count && !AICurlThread::sInstance->isStopped()) + { + ms_sleep(10); + } + Dout(dc::curl, "Curl thread" << (curlThreadIsRunning() ? " not" : "") << " stopped after " << ((100 - count) * 10) << "ms."); + } +} + } // namespace AICurlPrivate //----------------------------------------------------------------------------- @@ -994,7 +1082,7 @@ void AICurlEasyRequest::removeRequest(void) { // Write-lock the command queue. - command_queue_wat command_queue_w(command_queue); + command_queue_wat command_queue_w(command_queue); #ifdef SHOW_ASSERT // This debug code checks if we aren't calling removeRequest() twice for the same object. // That means that the thread calling this function already finished it, following from that diff --git a/indra/llmessage/aicurlthread.h b/indra/llmessage/aicurlthread.h index e8b080b94..a2262d9a8 100644 --- a/indra/llmessage/aicurlthread.h +++ b/indra/llmessage/aicurlthread.h @@ -92,7 +92,6 @@ class PollSet private: curl_socket_t* mFileDescriptors; - size_t mSize; // Size of mFileDescriptors array. int mNrFds; // The number of filedescriptors in the array. int mMaxFd; // The largest filedescriptor in the array, or -1 when it is empty. int mNext; // The index of the first file descriptor to start copying, the next call to refresh(). @@ -172,6 +171,7 @@ class MultiHandle : public CurlMultiHandle class AICurlMultiHandle : public AIThreadSafeSingleThreadDC, public LLThreadLocalDataMember { public: static AICurlMultiHandle& getInstance(void); + static void destroyInstance(void); private: // Use getInstance(). AICurlMultiHandle(void) { } diff --git a/indra/newview/llxmlrpctransaction.cpp b/indra/newview/llxmlrpctransaction.cpp index 5abe2df2e..72ccfb936 100644 --- a/indra/newview/llxmlrpctransaction.cpp +++ b/indra/newview/llxmlrpctransaction.cpp @@ -278,6 +278,12 @@ void LLXMLRPCTransaction::Impl::init(XMLRPC_REQUEST request, bool useGzip) mCurlEasyRequestStateMachinePtr->run(boost::bind(&LLXMLRPCTransaction::Impl::curlEasyRequestCallback, this, _1)); setStatus(LLXMLRPCTransaction::StatusStarted); } + else + { + // This deletes the statemachine immediately. + mCurlEasyRequestStateMachinePtr->kill(); + mCurlEasyRequestStateMachinePtr = NULL; + } } LLXMLRPCTransaction::Impl::~Impl() @@ -311,13 +317,19 @@ void LLXMLRPCTransaction::Impl::curlEasyRequestCallback(bool success) { llassert(mStatus == LLXMLRPCTransaction::StatusStarted || mStatus == LLXMLRPCTransaction::StatusDownloading); + AICurlEasyRequestStateMachine* state_machine = mCurlEasyRequestStateMachinePtr; + // We're done with the statemachine, one way or another. + // Set mCurlEasyRequestStateMachinePtr to NULL so we won't call mCurlEasyRequestStateMachinePtr->running() in the destructor. + // Note that the state machine auto-cleaning: it will be deleted by the main-thread after this function returns. + mCurlEasyRequestStateMachinePtr = NULL; + if (!success) { setStatus(LLXMLRPCTransaction::StatusOtherError, "Statemachine failed"); return; } - AICurlEasyRequest_wat curlEasyRequest_w(*mCurlEasyRequestStateMachinePtr->mCurlEasyRequest); + AICurlEasyRequest_wat curlEasyRequest_w(*state_machine->mCurlEasyRequest); CURLcode result; curlEasyRequest_w->getResult(&result, &mTransferInfo); diff --git a/indra/newview/statemachine/aicurleasyrequeststatemachine.cpp b/indra/newview/statemachine/aicurleasyrequeststatemachine.cpp index a308be093..752fe0827 100644 --- a/indra/newview/statemachine/aicurleasyrequeststatemachine.cpp +++ b/indra/newview/statemachine/aicurleasyrequeststatemachine.cpp @@ -138,6 +138,8 @@ void AICurlEasyRequestStateMachine::finish_impl(void) curl_easy_request_w->send_events_to(NULL); curl_easy_request_w->revokeCallbacks(); } + // Auto clean up. + kill(); } AICurlEasyRequestStateMachine::AICurlEasyRequestStateMachine(bool buffered) : mBuffered(buffered), mCurlEasyRequest(buffered)