Merge branch 'curlthreading2' into curlthreading3

Conflicts:
	indra/llmessage/llcurlrequest.cpp

Header file include changes collided. Fixed.
This commit is contained in:
Aleric Inglewood
2012-07-31 17:46:07 +02:00
4 changed files with 64 additions and 39 deletions

View File

@@ -1138,7 +1138,7 @@ CurlResponderBuffer::~CurlResponderBuffer()
// in which case AICurlEasyRequestStateMachine::mTimer times out, but that already
// calls CurlResponderBuffer::timed_out().
llmaybeerrs << "Calling ~CurlResponderBuffer() with active responder!" << llendl;
if (LLApp::isExiting())
if (!LLApp::isRunning())
{
// It might happen if some CurlResponderBuffer escaped clean up somehow :/
mResponder = NULL;
@@ -1164,9 +1164,7 @@ void CurlResponderBuffer::resetState(AICurlEasyRequest_wat& curl_easy_request_w)
curl_easy_request_w->resetState();
mOutput.reset();
mInput.str("");
mInput.clear();
mInput.reset();
mHeaderOutput.str("");
mHeaderOutput.clear();
@@ -1184,6 +1182,10 @@ void CurlResponderBuffer::prepRequest(AICurlEasyRequest_wat& curl_easy_request_w
curl_easy_request_w->setoptString(CURLOPT_ENCODING, "");
}
mInput.reset(new LLBufferArray);
mInput->setThreaded(true);
mLastRead = NULL;
mOutput.reset(new LLBufferArray);
mOutput->setThreaded(true);
@@ -1247,16 +1249,10 @@ size_t CurlResponderBuffer::curlReadCallback(char* data, size_t size, size_t nme
// to make sure that callbacks and destruction aren't done simultaneously.
AICurlEasyRequest_wat buffered_easy_request_w(*lockobj);
S32 bytes = size * nmemb; // The maximum amount to read.
AICurlResponderBuffer_wat buffer_w(*lockobj);
S32 n = size * nmemb;
S32 startpos = buffer_w->getInput().tellg();
buffer_w->getInput().seekg(0, std::ios::end);
S32 endpos = buffer_w->getInput().tellg();
buffer_w->getInput().seekg(startpos, std::ios::beg);
S32 maxn = endpos - startpos;
n = llmin(n, maxn);
buffer_w->getInput().read(data, n);
return n;
buffer_w->mLastRead = buffer_w->getInput()->readAfter(sChannels.out(), buffer_w->mLastRead, (U8*)data, bytes);
return bytes; // Return the amount actually read.
}
//static

View File

@@ -291,9 +291,9 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents {
void resetState(AICurlEasyRequest_wat& curl_easy_request_w);
void prepRequest(AICurlEasyRequest_wat& buffered_curl_easy_request_w, std::vector<std::string> const& headers, AICurlInterface::ResponderPtr responder, S32 time_out = 0, bool post = false);
std::stringstream& getInput() { return mInput; }
std::stringstream& getHeaderOutput() { return mHeaderOutput; }
LLIOPipe::buffer_ptr_t& getOutput() { return mOutput; }
LLIOPipe::buffer_ptr_t& getInput(void) { return mInput; }
std::stringstream& getHeaderOutput(void) { return mHeaderOutput; }
LLIOPipe::buffer_ptr_t& getOutput(void) { return mOutput; }
// Called if libcurl doesn't deliver within CurlRequestTimeOut seconds.
void timed_out(void);
@@ -310,7 +310,8 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents {
/*virtual*/ void removed_from_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w);
private:
std::stringstream mInput;
LLIOPipe::buffer_ptr_t mInput;
U8* mLastRead; // Pointer into mInput where we last stopped reading (or NULL to start at the beginning).
std::stringstream mHeaderOutput;
LLIOPipe::buffer_ptr_t mOutput;
AICurlInterface::ResponderPtr mResponder;
@@ -319,7 +320,7 @@ class CurlResponderBuffer : protected AICurlEasyHandleEvents {
S32 mResponseTransferedBytes;
public:
static LLChannelDescriptors const sChannels; // Channel object for mOutput: we ONLY use channel 0, so this can be a constant.
static LLChannelDescriptors const sChannels; // Channel object for mInput (channel out()) and mOutput (channel in()).
private:
// This class may only be created by constructing a ThreadSafeBufferedCurlEasyRequest.

View File

@@ -845,27 +845,50 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w)
// If no process has the pipe open for writing, read() shall return 0 to indicate end-of-file.
// If some process has the pipe open for writing and O_NONBLOCK is set, read() shall return -1 and set errno to [EAGAIN].
char buf[256];
ssize_t len;
do
bool got_data = false;
for(;;)
{
len = read(mWakeUpFd, buf, sizeof(buf));
if (len == -1 && errno == EAGAIN)
ssize_t len = read(mWakeUpFd, buf, sizeof(buf));
if (len > 0)
{
// Data was read from the pipe.
got_data = true;
if (len < sizeof(buf))
break;
}
else if (len == -1)
{
// An error occurred.
if (errno == EAGAIN)
{
if (got_data)
break;
// There was no data, even though select() said so. If this ever happens at all(?), lets just return and enter select() again.
return;
}
else if (errno == EINTR)
{
continue;
}
else
{
llerrs << "read(3) from mWakeUpFd: " << strerror(errno) << llendl;
return;
}
}
else
{
// pipe(2) returned 0.
llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl;
close(mWakeUpFd);
mWakeUpFd = CURL_SOCKET_BAD;
mRunning = false;
return;
}
while(len == -1 && errno == EINTR);
if (len == -1)
{
llerrs << "read(3) from mWakeUpFd: " << strerror(errno) << llendl;
}
if (LL_UNLIKELY(len == 0))
{
llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl;
close(mWakeUpFd);
mWakeUpFd = CURL_SOCKET_BAD;
mRunning = false;
return;
}
}
#endif
// Data was received on mWakeUpFd. This means that the main-thread added one
// or more commands to the command queue and called wakeUpCurlThread().
process_commands(multi_handle_w);
}

View File

@@ -41,6 +41,7 @@
#include "llsdserialize.h"
#include "llcurlrequest.h"
#include "llbufferstream.h"
#include "aicurleasyrequeststatemachine.h"
//-----------------------------------------------------------------------------
@@ -94,8 +95,11 @@ bool Request::post(std::string const& url, headers_t const& headers, std::string
buffer_w->prepRequest(buffered_easy_request_w, headers, responder);
buffer_w->getInput().write(data.data(), data.size());
S32 bytes = buffer_w->getInput().str().length();
U32 bytes = data.size();
bool success = buffer_w->getInput()->append(buffer_w->sChannels.out(), (U8 const*)data.data(), bytes);
llassert_always(success); // AIFIXME: Maybe throw an error.
if (!success)
return false;
buffered_easy_request_w->setPost(NULL, bytes);
buffered_easy_request_w->addHeader("Content-Type: application/octet-stream");
buffered_easy_request_w->finalizeRequest(url);
@@ -121,8 +125,9 @@ bool Request::post(std::string const& url, headers_t const& headers, LLSD const&
buffer_w->prepRequest(buffered_easy_request_w, headers, responder);
LLSDSerialize::toXML(data, buffer_w->getInput());
S32 bytes = buffer_w->getInput().str().length();
LLBufferStream buffer_stream(buffer_w->sChannels, buffer_w->getInput().get());
LLSDSerialize::toXML(data, buffer_stream);
S32 bytes = buffer_w->getInput()->countAfter(buffer_w->sChannels.out(), NULL);
buffered_easy_request_w->setPost(NULL, bytes);
buffered_easy_request_w->addHeader("Content-Type: application/llsd+xml");
buffered_easy_request_w->finalizeRequest(url);