Rename CurlResponderBuffer to BufferedCurlEasyRequest and derive it from CurlEasyRequest

Every curl transaction is a AICurlEasyRequestStateMachine which has a
AICurlEasyRequest as member, which is a reference counting pointer to
a ThreadSafeBufferedCurlEasyRequest. And now BufferedCurlEasyRequest is
derived from CurlEasyRequest which is derived from CurlEasyHandle, but
neither are used separatedly.
This commit is contained in:
Aleric Inglewood
2012-11-02 18:41:23 +01:00
parent 1e1f5e8193
commit 72bde5234a
8 changed files with 170 additions and 272 deletions

View File

@@ -215,14 +215,14 @@ class Command {
Command(AICurlEasyRequest const& easy_request, command_st command) : mCurlEasyRequest(easy_request.get_ptr()), mCommand(command) { }
command_st command(void) const { return mCommand; }
CurlEasyRequestPtr const& easy_request(void) const { return mCurlEasyRequest; }
BufferedCurlEasyRequestPtr const& easy_request(void) const { return mCurlEasyRequest; }
bool operator==(AICurlEasyRequest const& easy_request) const { return mCurlEasyRequest == easy_request.get_ptr(); }
void reset(void);
private:
CurlEasyRequestPtr mCurlEasyRequest;
BufferedCurlEasyRequestPtr mCurlEasyRequest;
command_st mCommand;
};
@@ -237,11 +237,11 @@ void Command::reset(void)
//
// MAIN-THREAD (AICurlEasyRequest::addRequest)
// * command_queue locked
// - A non-active (mActiveMultiHandle is NULL) ThreadSafeCurlEasyRequest (by means of an AICurlEasyRequest pointing to it) is added to command_queue with as command cmd_add.
// - A non-active (mActiveMultiHandle is NULL) ThreadSafeBufferedCurlEasyRequest (by means of an AICurlEasyRequest pointing to it) is added to command_queue with as command cmd_add.
// * command_queue unlocked
//
// If at this point addRequest is called again, then it is detected that the last command added to the queue
// for this ThreadSafeCurlEasyRequest is cmd_add.
// for this ThreadSafeBufferedCurlEasyRequest is cmd_add.
//
// CURL-THREAD (AICurlThread::wakeup):
// * command_queue locked
@@ -251,7 +251,7 @@ void Command::reset(void)
// - The command is removed from command_queue
// * command_queue unlocked
//
// If at this point addRequest is called again, then it is detected that command_being_processed adds the same ThreadSafeCurlEasyRequest.
// If at this point addRequest is called again, then it is detected that command_being_processed adds the same ThreadSafeBufferedCurlEasyRequest.
//
// * command_being_processed is read-locked
// - mActiveMultiHandle is set to point to the curl multi handle
@@ -260,7 +260,7 @@ void Command::reset(void)
// - command_being_processed is reset
// * command_being_processed is unlocked
//
// If at this point addRequest is called again, then it is detected that the ThreadSafeCurlEasyRequest is active.
// If at this point addRequest is called again, then it is detected that the ThreadSafeBufferedCurlEasyRequest is active.
// Multi-threaded queue for passing Command objects from the main-thread to the curl-thread.
AIThreadSafeSimpleDC<std::deque<Command> > command_queue;
@@ -746,7 +746,7 @@ std::ostream& operator<<(std::ostream& os, DebugFdSet const& s)
class CurlSocketInfo
{
public:
CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeCurlEasyRequest* lockobj);
CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeBufferedCurlEasyRequest* lockobj);
~CurlSocketInfo();
void set_action(int action);
@@ -760,7 +760,7 @@ class CurlSocketInfo
LLPointer<HTTPTimeout> mTimeout;
};
CurlSocketInfo::CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeCurlEasyRequest* lockobj) :
CurlSocketInfo::CurlSocketInfo(MultiHandle& multi_handle, CURL* easy, curl_socket_t s, int action, ThreadSafeBufferedCurlEasyRequest* lockobj) :
mMultiHandle(multi_handle), mEasy(easy), mSocketFd(s), mAction(CURL_POLL_NONE), mEasyRequest(lockobj)
{
llassert(*AICurlEasyRequest_wat(*mEasyRequest) == easy);
@@ -1464,7 +1464,7 @@ void MultiHandle::handle_stalls(void)
int MultiHandle::socket_callback(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
{
#ifdef CWDEBUG
ThreadSafeCurlEasyRequest* lockobj = NULL;
ThreadSafeBufferedCurlEasyRequest* lockobj = NULL;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &lockobj);
DoutEntering(dc::curl, "MultiHandle::socket_callback((CURL*)" << (void*)easy << ", " << s <<
", " << action_str(action) << ", " << (void*)userp << ", " << (void*)socketp << ") [CURLINFO_PRIVATE = " << (void*)lockobj << "]");
@@ -1479,7 +1479,7 @@ int MultiHandle::socket_callback(CURL* easy, curl_socket_t s, int action, void*
{
if (!sock_info)
{
ThreadSafeCurlEasyRequest* ptr;
ThreadSafeBufferedCurlEasyRequest* ptr;
CURLcode rese = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr);
llassert_always(rese == CURLE_OK);
sock_info = new CurlSocketInfo(self, easy, s, action, ptr);
@@ -1599,7 +1599,7 @@ CURLMcode MultiHandle::remove_easy_request(addedEasyRequests_type::iterator cons
#endif
}
#if CWDEBUG
ThreadSafeCurlEasyRequest* lockobj = iter->get_ptr().get();
ThreadSafeBufferedCurlEasyRequest* lockobj = iter->get_ptr().get();
#endif
mAddedEasyRequests.erase(iter);
Dout(dc::curl, "MultiHandle::remove_easy_request: Removed AICurlEasyRequest " << (void*)lockobj << "; now processing " << mAddedEasyRequests.size() << " easy handles.");
@@ -1626,7 +1626,7 @@ void MultiHandle::check_run_count(void)
if (msg->msg == CURLMSG_DONE)
{
CURL* easy = msg->easy_handle;
ThreadSafeCurlEasyRequest* ptr;
ThreadSafeBufferedCurlEasyRequest* ptr;
CURLcode rese = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr);
llassert_always(rese == CURLE_OK);
AICurlEasyRequest easy_request(ptr);
@@ -1973,9 +1973,9 @@ void HTTPTimeout::done(AICurlEasyRequest_wat const& curlEasyRequest_w, CURLcode
DoutCurl("done: mStalled set to -1");
}
void HTTPTimeout::print_diagnostics(AICurlEasyRequest_wat const& curlEasyRequest_w)
void HTTPTimeout::print_diagnostics(CurlEasyRequest const* curl_easy_request)
{
llwarns << "Request to " << curlEasyRequest_w->getLowercaseHostname() << " timed out for " << curlEasyRequest_w->getTimeoutPolicy()->name() << llendl;
llwarns << "Request to " << curl_easy_request->getLowercaseHostname() << " timed out for " << curl_easy_request->getTimeoutPolicy()->name() << llendl;
}
} // namespace curlthread
@@ -2039,48 +2039,25 @@ void stopCurlThread(void)
}
//-----------------------------------------------------------------------------
// CurlResponderBuffer
// BufferedCurlEasyRequest
void CurlResponderBuffer::setStatusAndReason(U32 status, std::string const& reason)
void BufferedCurlEasyRequest::setStatusAndReason(U32 status, std::string const& reason)
{
mStatus = status;
mReason = reason;
}
void CurlResponderBuffer::added_to_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w)
{
llerrs << "Unexpected call to added_to_multi_handle()." << llendl;
}
void CurlResponderBuffer::finished(AICurlEasyRequest_wat& curl_easy_request_w)
{
llerrs << "Unexpected call to finished()." << llendl;
}
void CurlResponderBuffer::removed_from_multi_handle(AICurlEasyRequest_wat& curl_easy_request_w)
{
DoutCurl("Calling CurlResponderBuffer::removed_from_multi_handle(@" << (void*)&*curl_easy_request_w << ") for this = " << (void*)this);
// Lock self.
ThreadSafeBufferedCurlEasyRequest* lockobj = get_lockobj();
llassert(dynamic_cast<ThreadSafeBufferedCurlEasyRequest*>(static_cast<ThreadSafeCurlEasyRequest*>(ThreadSafeCurlEasyRequest::wrapper_cast(&*curl_easy_request_w))) == lockobj);
AICurlResponderBuffer_wat buffer_w(*lockobj);
llassert(&*buffer_w == this);
processOutput(curl_easy_request_w);
}
void CurlResponderBuffer::processOutput(AICurlEasyRequest_wat& curl_easy_request_w)
void BufferedCurlEasyRequest::processOutput(void)
{
U32 responseCode = 0;
std::string responseReason;
CURLcode code;
AITransferInfo info;
curl_easy_request_w->getResult(&code, &info);
getResult(&code, &info);
if (code == CURLE_OK)
{
curl_easy_request_w->getinfo(CURLINFO_RESPONSE_CODE, &responseCode);
getinfo(CURLINFO_RESPONSE_CODE, &responseCode);
// If getResult code is CURLE_OK then we should have decoded the first header line ourselves.
llassert(responseCode == mStatus);
if (responseCode == mStatus)
@@ -2092,12 +2069,12 @@ void CurlResponderBuffer::processOutput(AICurlEasyRequest_wat& curl_easy_request
{
responseCode = HTTP_INTERNAL_ERROR;
responseReason = curl_easy_strerror(code);
curl_easy_request_w->setopt(CURLOPT_FRESH_CONNECT, TRUE);
setopt(CURLOPT_FRESH_CONNECT, TRUE);
}
if (code != CURLE_OK)
{
curl_easy_request_w->print_diagnostics(curl_easy_request_w, code);
print_diagnostics(code);
}
if (mBufferEventsTarget)
{
@@ -2110,43 +2087,42 @@ void CurlResponderBuffer::processOutput(AICurlEasyRequest_wat& curl_easy_request
mResponder->finished(code, responseCode, responseReason, sChannels, mOutput);
mResponder = NULL;
resetState(curl_easy_request_w);
resetState();
}
void CurlResponderBuffer::received_HTTP_header(void)
void BufferedCurlEasyRequest::received_HTTP_header(void)
{
if (mBufferEventsTarget)
mBufferEventsTarget->received_HTTP_header();
}
void CurlResponderBuffer::received_header(std::string const& key, std::string const& value)
void BufferedCurlEasyRequest::received_header(std::string const& key, std::string const& value)
{
if (mBufferEventsTarget)
mBufferEventsTarget->received_header(key, value);
}
void CurlResponderBuffer::completed_headers(U32 status, std::string const& reason, AITransferInfo* info)
void BufferedCurlEasyRequest::completed_headers(U32 status, std::string const& reason, AITransferInfo* info)
{
if (mBufferEventsTarget)
mBufferEventsTarget->completed_headers(status, reason, info);
}
//static
size_t CurlResponderBuffer::curlWriteCallback(char* data, size_t size, size_t nmemb, void* user_data)
size_t BufferedCurlEasyRequest::curlWriteCallback(char* data, size_t size, size_t nmemb, void* user_data)
{
ThreadSafeBufferedCurlEasyRequest* lockobj = static_cast<ThreadSafeBufferedCurlEasyRequest*>(user_data);
// We need to lock the curl easy request object too, because that lock is used
// to make sure that callbacks and destruction aren't done simultaneously.
AICurlEasyRequest_wat buffered_easy_request_w(*lockobj);
AICurlEasyRequest_wat self_w(*lockobj);
S32 bytes = size * nmemb; // The amount to write.
AICurlResponderBuffer_wat buffer_w(*lockobj);
// CurlResponderBuffer::setBodyLimit is never called, so buffer_w->mBodyLimit is infinite.
// BufferedCurlEasyRequest::setBodyLimit is never called, so buffer_w->mBodyLimit is infinite.
//S32 bytes = llmin(size * nmemb, buffer_w->mBodyLimit); buffer_w->mBodyLimit -= bytes;
buffer_w->getOutput()->append(sChannels.in(), (U8 const*)data, bytes);
buffer_w->mResponseTransferedBytes += bytes; // Accumulate data received from the server.
if (buffered_easy_request_w->httptimeout()->data_received(bytes)) // Update timeout administration.
self_w->getOutput()->append(sChannels.in(), (U8 const*)data, bytes);
self_w->mResponseTransferedBytes += bytes; // Accumulate data received from the server.
if (self_w->httptimeout()->data_received(bytes)) // Update timeout administration.
{
// Transfer timed out. Return 0 which will abort with error CURLE_WRITE_ERROR.
return 0;
@@ -2155,23 +2131,22 @@ size_t CurlResponderBuffer::curlWriteCallback(char* data, size_t size, size_t nm
}
//static
size_t CurlResponderBuffer::curlReadCallback(char* data, size_t size, size_t nmemb, void* user_data)
size_t BufferedCurlEasyRequest::curlReadCallback(char* data, size_t size, size_t nmemb, void* user_data)
{
ThreadSafeBufferedCurlEasyRequest* lockobj = static_cast<ThreadSafeBufferedCurlEasyRequest*>(user_data);
// We need to lock the curl easy request object too, because that lock is used
// to make sure that callbacks and destruction aren't done simultaneously.
AICurlEasyRequest_wat buffered_easy_request_w(*lockobj);
AICurlEasyRequest_wat self_w(*lockobj);
S32 bytes = size * nmemb; // The maximum amount to read.
AICurlResponderBuffer_wat buffer_w(*lockobj);
buffer_w->mLastRead = buffer_w->getInput()->readAfter(sChannels.out(), buffer_w->mLastRead, (U8*)data, bytes);
buffer_w->mRequestTransferedBytes += bytes; // Accumulate data sent to the server.
self_w->mLastRead = self_w->getInput()->readAfter(sChannels.out(), self_w->mLastRead, (U8*)data, bytes);
self_w->mRequestTransferedBytes += bytes; // Accumulate data sent to the server.
// Timeout administration. Note that it can happen that we get here
// before the socket callback has been called, because the silly libcurl
// writes headers without informing us. In that case it's OK to create
// the Timeout object on the fly, so pass lockobj.
if (buffered_easy_request_w->httptimeout(lockobj)->data_sent(bytes))
if (self_w->httptimeout(lockobj)->data_sent(bytes))
{
// Transfer timed out. Return CURL_READFUNC_ABORT which will abort with error CURLE_ABORTED_BY_CALLBACK.
return CURL_READFUNC_ABORT;
@@ -2180,19 +2155,19 @@ size_t CurlResponderBuffer::curlReadCallback(char* data, size_t size, size_t nme
}
//static
size_t CurlResponderBuffer::curlHeaderCallback(char* data, size_t size, size_t nmemb, void* user_data)
size_t BufferedCurlEasyRequest::curlHeaderCallback(char* data, size_t size, size_t nmemb, void* user_data)
{
ThreadSafeBufferedCurlEasyRequest* lockobj = static_cast<ThreadSafeBufferedCurlEasyRequest*>(user_data);
// We need to lock the curl easy request object, because that lock is used
// to make sure that callbacks and destruction aren't done simultaneously.
AICurlEasyRequest_wat buffered_easy_request_w(*lockobj);
AICurlEasyRequest_wat self_w(*lockobj);
// This used to be headerCallback() in llurlrequest.cpp.
char const* const header_line = static_cast<char const*>(data);
size_t const header_len = size * nmemb;
if (buffered_easy_request_w->httptimeout()->data_received(header_len)) // Update timeout administration.
if (self_w->httptimeout()->data_received(header_len)) // Update timeout administration.
{
// Transfer timed out. Return 0 which will abort with error CURLE_WRITE_ERROR.
return 0;
@@ -2231,9 +2206,8 @@ size_t CurlResponderBuffer::curlHeaderCallback(char* data, size_t size, size_t n
llwarns << "Received broken header line from server: \"" << header << "\"" << llendl;
}
{
AICurlResponderBuffer_wat curl_responder_buffer_w(*lockobj);
curl_responder_buffer_w->received_HTTP_header();
curl_responder_buffer_w->setStatusAndReason(status, reason);
self_w->received_HTTP_header();
self_w->setStatusAndReason(status, reason);
}
return header_len;
}
@@ -2248,7 +2222,7 @@ size_t CurlResponderBuffer::curlHeaderCallback(char* data, size_t size, size_t n
key = utf8str_tolower(utf8str_trim(key));
value = utf8str_trim(value);
AICurlResponderBuffer_wat(*lockobj)->received_header(key, value);
self_w->received_header(key, value);
}
else
{
@@ -2268,7 +2242,7 @@ int debug_callback(CURL*, curl_infotype infotype, char* buf, size_t size, void*
#ifdef CWDEBUG
using namespace ::libcwd;
CurlEasyRequest* request = (CurlEasyRequest*)user_ptr;
BufferedCurlEasyRequest* request = (BufferedCurlEasyRequest*)user_ptr;
std::ostringstream marker;
marker << (void*)request->get_lockobj();
libcw_do.push_marker();