diff --git a/indra/aistatemachine/aicurl.cpp b/indra/aistatemachine/aicurl.cpp index 51325e6c2..090876f4e 100644 --- a/indra/aistatemachine/aicurl.cpp +++ b/indra/aistatemachine/aicurl.cpp @@ -1138,7 +1138,7 @@ CurlResponderBuffer::~CurlResponderBuffer() // in which case AICurlEasyRequestStateMachine::mTimer times out, but that already // calls CurlResponderBuffer::timed_out(). llmaybeerrs << "Calling ~CurlResponderBuffer() with active responder!" << llendl; - if (LLApp::isExiting()) + if (!LLApp::isRunning()) { // It might happen if some CurlResponderBuffer escaped clean up somehow :/ mResponder = NULL; @@ -1164,9 +1164,7 @@ void CurlResponderBuffer::resetState(AICurlEasyRequest_wat& curl_easy_request_w) curl_easy_request_w->resetState(); mOutput.reset(); - - mInput.str(""); - mInput.clear(); + mInput.reset(); mHeaderOutput.str(""); mHeaderOutput.clear(); @@ -1184,6 +1182,10 @@ void CurlResponderBuffer::prepRequest(AICurlEasyRequest_wat& curl_easy_request_w curl_easy_request_w->setoptString(CURLOPT_ENCODING, ""); } + mInput.reset(new LLBufferArray); + mInput->setThreaded(true); + mLastRead = NULL; + mOutput.reset(new LLBufferArray); mOutput->setThreaded(true); @@ -1247,16 +1249,10 @@ size_t CurlResponderBuffer::curlReadCallback(char* data, size_t size, size_t nme // to make sure that callbacks and destruction aren't done simultaneously. AICurlEasyRequest_wat buffered_easy_request_w(*lockobj); + S32 bytes = size * nmemb; // The maximum amount to read. AICurlResponderBuffer_wat buffer_w(*lockobj); - S32 n = size * nmemb; - S32 startpos = buffer_w->getInput().tellg(); - buffer_w->getInput().seekg(0, std::ios::end); - S32 endpos = buffer_w->getInput().tellg(); - buffer_w->getInput().seekg(startpos, std::ios::beg); - S32 maxn = endpos - startpos; - n = llmin(n, maxn); - buffer_w->getInput().read(data, n); - return n; + buffer_w->mLastRead = buffer_w->getInput()->readAfter(sChannels.out(), buffer_w->mLastRead, (U8*)data, bytes); + return bytes; // Return the amount actually read. } //static diff --git a/indra/aistatemachine/aicurlprivate.h b/indra/aistatemachine/aicurlprivate.h index 56b2d58ea..acd3a808c 100644 --- a/indra/aistatemachine/aicurlprivate.h +++ b/indra/aistatemachine/aicurlprivate.h @@ -291,9 +291,9 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents { void resetState(AICurlEasyRequest_wat& curl_easy_request_w); void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, std::vector const& headers, AICurlInterface::ResponderPtr responder, S32 time_out = 0, bool post = false); - std::stringstream& getInput() { return mInput; } - std::stringstream& getHeaderOutput() { return mHeaderOutput; } - LLIOPipe::buffer_ptr_t& getOutput() { return mOutput; } + LLIOPipe::buffer_ptr_t& getInput(void) { return mInput; } + std::stringstream& getHeaderOutput(void) { return mHeaderOutput; } + LLIOPipe::buffer_ptr_t& getOutput(void) { return mOutput; } // Called if libcurl doesn't deliver within CurlRequestTimeOut seconds. void timed_out(void); @@ -310,7 +310,8 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents { /*virtual*/ void removed_from_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w); private: - std::stringstream mInput; + LLIOPipe::buffer_ptr_t mInput; + U8* mLastRead; // Pointer into mInput where we last stopped reading (or NULL to start at the beginning). std::stringstream mHeaderOutput; LLIOPipe::buffer_ptr_t mOutput; AICurlInterface::ResponderPtr mResponder; @@ -319,7 +320,7 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents { S32 mResponseTransferedBytes; public: - static LLChannelDescriptors const sChannels; // Channel object for mOutput: we ONLY use channel 0, so this can be a constant. + static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()). private: // This class may only be created by constructing a ThreadSafeBufferedCurlEasyRequest. diff --git a/indra/aistatemachine/aicurlthread.cpp b/indra/aistatemachine/aicurlthread.cpp index 2bb33e1a5..c75912047 100644 --- a/indra/aistatemachine/aicurlthread.cpp +++ b/indra/aistatemachine/aicurlthread.cpp @@ -845,27 +845,50 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w) // If no process has the pipe open for writing, read() shall return 0 to indicate end-of-file. // If some process has the pipe open for writing and O_NONBLOCK is set, read() shall return -1 and set errno to [EAGAIN]. char buf[256]; - ssize_t len; - do + bool got_data = false; + for(;;) { - len = read(mWakeUpFd, buf, sizeof(buf)); - if (len == -1 && errno == EAGAIN) + ssize_t len = read(mWakeUpFd, buf, sizeof(buf)); + if (len > 0) + { + // Data was read from the pipe. + got_data = true; + if (len < sizeof(buf)) + break; + } + else if (len == -1) + { + // An error occurred. + if (errno == EAGAIN) + { + if (got_data) + break; + // There was no data, even though select() said so. If this ever happens at all(?), lets just return and enter select() again. + return; + } + else if (errno == EINTR) + { + continue; + } + else + { + llerrs << "read(3) from mWakeUpFd: " << strerror(errno) << llendl; + return; + } + } + else + { + // pipe(2) returned 0. + llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl; + close(mWakeUpFd); + mWakeUpFd = CURL_SOCKET_BAD; + mRunning = false; return; - } - while(len == -1 && errno == EINTR); - if (len == -1) - { - llerrs << "read(3) from mWakeUpFd: " << strerror(errno) << llendl; - } - if (LL_UNLIKELY(len == 0)) - { - llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl; - close(mWakeUpFd); - mWakeUpFd = CURL_SOCKET_BAD; - mRunning = false; - return; + } } #endif + // Data was received on mWakeUpFd. This means that the main-thread added one + // or more commands to the command queue and called wakeUpCurlThread(). process_commands(multi_handle_w); } diff --git a/indra/llmessage/llcurlrequest.cpp b/indra/llmessage/llcurlrequest.cpp index 89338dfce..c1772db25 100644 --- a/indra/llmessage/llcurlrequest.cpp +++ b/indra/llmessage/llcurlrequest.cpp @@ -41,6 +41,7 @@ #include "llsdserialize.h" #include "llcurlrequest.h" +#include "llbufferstream.h" #include "aicurleasyrequeststatemachine.h" //----------------------------------------------------------------------------- @@ -94,8 +95,11 @@ bool Request::post(std::string const& url, headers_t const& headers, std::string buffer_w->prepRequest(buffered_easy_request_w, headers, responder); - buffer_w->getInput().write(data.data(), data.size()); - S32 bytes = buffer_w->getInput().str().length(); + U32 bytes = data.size(); + bool success = buffer_w->getInput()->append(buffer_w->sChannels.out(), (U8 const*)data.data(), bytes); + llassert_always(success); // AIFIXME: Maybe throw an error. + if (!success) + return false; buffered_easy_request_w->setPost(NULL, bytes); buffered_easy_request_w->addHeader("Content-Type: application/octet-stream"); buffered_easy_request_w->finalizeRequest(url); @@ -121,8 +125,9 @@ bool Request::post(std::string const& url, headers_t const& headers, LLSD const& buffer_w->prepRequest(buffered_easy_request_w, headers, responder); - LLSDSerialize::toXML(data, buffer_w->getInput()); - S32 bytes = buffer_w->getInput().str().length(); + LLBufferStream buffer_stream(buffer_w->sChannels, buffer_w->getInput().get()); + LLSDSerialize::toXML(data, buffer_stream); + S32 bytes = buffer_w->getInput()->countAfter(buffer_w->sChannels.out(), NULL); buffered_easy_request_w->setPost(NULL, bytes); buffered_easy_request_w->addHeader("Content-Type: application/llsd+xml"); buffered_easy_request_w->finalizeRequest(url);