Threaded cURL. Consider experimental (what in this branch isn't?), however it drastically reduces hitching for me... so yup.

This commit is contained in:
Shyotl
2011-07-31 02:53:26 -05:00
parent d397513840
commit 28568add4c
3 changed files with 110 additions and 28 deletions

View File

@@ -620,11 +620,18 @@ void LLCurl::Easy::prepRequest(const std::string& url,
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
class LLCurl::Multi class LLCurl::Multi : public LLThread
{ {
LOG_CLASS(Multi); LOG_CLASS(Multi);
public: public:
typedef enum
{
PERFORM_STATE_READY=0,
PERFORM_STATE_PERFORMING=1,
PERFORM_STATE_COMPLETED=2
} ePerformState;
Multi(); Multi();
~Multi(); ~Multi();
@@ -634,13 +641,20 @@ public:
void removeEasy(Easy* easy); void removeEasy(Easy* easy);
S32 process(); S32 process();
S32 perform(); void perform();
virtual void run();
CURLMsg* info_read(S32* msgs_in_queue); CURLMsg* info_read(S32* msgs_in_queue);
S32 mQueued; S32 mQueued;
S32 mErrorCount; S32 mErrorCount;
S32 mPerformState;
LLCondition* mSignal;
bool mQuitting;
private: private:
void easyFree(Easy*); void easyFree(Easy*);
@@ -655,21 +669,32 @@ private:
}; };
LLCurl::Multi::Multi() LLCurl::Multi::Multi()
: mQueued(0), : LLThread("Curl Multi"),
mErrorCount(0) mQueued(0),
mErrorCount(0),
mPerformState(PERFORM_STATE_READY)
{ {
mQuitting = false;
mSignal = new LLCondition;
mCurlMultiHandle = curl_multi_init(); mCurlMultiHandle = curl_multi_init();
if (!mCurlMultiHandle) if (!mCurlMultiHandle)
{ {
llwarns << "curl_multi_init() returned NULL! Easy handles: " << gCurlEasyCount << " Multi handles: " << gCurlMultiCount << llendl; llwarns << "curl_multi_init() returned NULL! Easy handles: " << gCurlEasyCount << " Multi handles: " << gCurlMultiCount << llendl;
mCurlMultiHandle = curl_multi_init(); mCurlMultiHandle = curl_multi_init();
} }
llassert_always(mCurlMultiHandle); llassert_always(mCurlMultiHandle);
++gCurlMultiCount; ++gCurlMultiCount;
} }
LLCurl::Multi::~Multi() LLCurl::Multi::~Multi()
{ {
llassert(isStopped());
delete mSignal;
mSignal = NULL;
// Clean up active // Clean up active
for(easy_active_list_t::iterator iter = mEasyActiveList.begin(); for(easy_active_list_t::iterator iter = mEasyActiveList.begin();
iter != mEasyActiveList.end(); ++iter) iter != mEasyActiveList.end(); ++iter)
@@ -695,30 +720,50 @@ CURLMsg* LLCurl::Multi::info_read(S32* msgs_in_queue)
return curlmsg; return curlmsg;
} }
void LLCurl::Multi::perform()
S32 LLCurl::Multi::perform()
{ {
S32 q = 0; if (mPerformState == PERFORM_STATE_READY)
for (S32 call_count = 0;
call_count < MULTI_PERFORM_CALL_REPEAT;
call_count += 1)
{ {
CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q); mSignal->signal();
if (CURLM_CALL_MULTI_PERFORM != code || q == 0) }
{ }
check_curl_multi_code(code);
break; void LLCurl::Multi::run()
} {
while (!mQuitting)
{
mSignal->wait();
mPerformState = PERFORM_STATE_PERFORMING;
if (!mQuitting)
{
S32 q = 0;
for (S32 call_count = 0;
call_count < MULTI_PERFORM_CALL_REPEAT;
call_count += 1)
{
CURLMcode code = curl_multi_perform(mCurlMultiHandle, &q);
if (CURLM_CALL_MULTI_PERFORM != code || q == 0)
{
check_curl_multi_code(code);
break;
}
}
mQueued = q;
mPerformState = PERFORM_STATE_COMPLETED;
}
} }
mQueued = q;
return q;
} }
S32 LLCurl::Multi::process() S32 LLCurl::Multi::process()
{ {
perform(); perform();
if (mPerformState != PERFORM_STATE_COMPLETED)
{
return 0;
}
CURLMsg* msg; CURLMsg* msg;
int msgs_in_queue; int msgs_in_queue;
@@ -749,6 +794,8 @@ S32 LLCurl::Multi::process()
} }
} }
} }
mPerformState = PERFORM_STATE_READY;
return processed; return processed;
} }
@@ -827,6 +874,18 @@ LLCurlRequest::LLCurlRequest() :
LLCurlRequest::~LLCurlRequest() LLCurlRequest::~LLCurlRequest()
{ {
llassert_always(mThreadID == LLThread::currentID()); llassert_always(mThreadID == LLThread::currentID());
//stop all Multi handle background threads
for (curlmulti_set_t::iterator iter = mMultiSet.begin(); iter != mMultiSet.end(); ++iter)
{
LLCurl::Multi* multi = *iter;
multi->mQuitting = true;
while (!multi->isStopped())
{
multi->mSignal->signal();
apr_sleep(1000);
}
}
for_each(mMultiSet.begin(), mMultiSet.end(), DeletePointer()); for_each(mMultiSet.begin(), mMultiSet.end(), DeletePointer());
} }
@@ -834,6 +893,7 @@ void LLCurlRequest::addMulti()
{ {
llassert_always(mThreadID == LLThread::currentID()); llassert_always(mThreadID == LLThread::currentID());
LLCurl::Multi* multi = new LLCurl::Multi(); LLCurl::Multi* multi = new LLCurl::Multi();
multi->start();
mMultiSet.insert(multi); mMultiSet.insert(multi);
mActiveMulti = multi; mActiveMulti = multi;
mActiveRequestCount = 0; mActiveRequestCount = 0;
@@ -963,6 +1023,13 @@ S32 LLCurlRequest::process()
if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0) if (multi != mActiveMulti && tres == 0 && multi->mQueued == 0)
{ {
mMultiSet.erase(curiter); mMultiSet.erase(curiter);
multi->mQuitting = true;
while (!multi->isStopped())
{
multi->mSignal->signal();
apr_sleep(1000);
}
delete multi; delete multi;
} }
} }
@@ -993,6 +1060,7 @@ LLCurlEasyRequest::LLCurlEasyRequest()
mResultReturned(false) mResultReturned(false)
{ {
mMulti = new LLCurl::Multi(); mMulti = new LLCurl::Multi();
mMulti->start();
mEasy = mMulti->allocEasy(); mEasy = mMulti->allocEasy();
if (mEasy) if (mEasy)
{ {
@@ -1003,6 +1071,12 @@ LLCurlEasyRequest::LLCurlEasyRequest()
LLCurlEasyRequest::~LLCurlEasyRequest() LLCurlEasyRequest::~LLCurlEasyRequest()
{ {
mMulti->mQuitting = true;
while (!mMulti->isStopped())
{
mMulti->mSignal->signal();
apr_sleep(1000);
}
delete mMulti; delete mMulti;
} }
@@ -1099,14 +1173,20 @@ void LLCurlEasyRequest::requestComplete()
} }
} }
S32 LLCurlEasyRequest::perform() void LLCurlEasyRequest::perform()
{ {
return mMulti->perform(); mMulti->perform();
} }
// Usage: Call getRestult until it returns false (no more messages) // Usage: Call getRestult until it returns false (no more messages)
bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info) bool LLCurlEasyRequest::getResult(CURLcode* result, LLCurl::TransferInfo* info)
{ {
if (mMulti->mPerformState != LLCurl::Multi::PERFORM_STATE_COMPLETED)
{ //we're busy, try again later
return false;
}
mMulti->mPerformState = LLCurl::Multi::PERFORM_STATE_READY;
if (!mEasy) if (!mEasy)
{ {
// Special case - we failed to initialize a curl_easy (can happen if too many open files) // Special case - we failed to initialize a curl_easy (can happen if too many open files)

View File

@@ -241,7 +241,7 @@ public:
void slist_append(const char* str); void slist_append(const char* str);
void sendRequest(const std::string& url); void sendRequest(const std::string& url);
void requestComplete(); void requestComplete();
S32 perform(); void perform();
bool getResult(CURLcode* result, LLCurl::TransferInfo* info = NULL); bool getResult(CURLcode* result, LLCurl::TransferInfo* info = NULL);
std::string getErrorString(); std::string getErrorString();

View File

@@ -319,16 +319,18 @@ bool LLXMLRPCTransaction::Impl::process()
} }
} }
const F32 MAX_PROCESSING_TIME = 0.05f; //const F32 MAX_PROCESSING_TIME = 0.05f;
LLTimer timer; //LLTimer timer;
while (mCurlRequest->perform() > 0) mCurlRequest->perform();
/*while (mCurlRequest->perform() > 0)
{ {
if (timer.getElapsedTimeF32() >= MAX_PROCESSING_TIME) if (timer.getElapsedTimeF32() >= MAX_PROCESSING_TIME)
{ {
return false; return false;
} }
} }*/
while(1) while(1)
{ {