Code hardening, review, bug fixes, documentation, curl stats and cleanup.

Bug fixes:
AICurlEasyRequestStateMachine didn't delete itself.
curl_multi_socket_action calls were made for potentional removed sockets.
The curl thread wasn't terminated.
This commit is contained in:
Aleric Inglewood
2012-07-04 00:10:43 +02:00
parent 9b8e5c8719
commit 125a10bb44
6 changed files with 300 additions and 143 deletions

View File

@@ -132,8 +132,10 @@ namespace curlthread {
//
// mMaxFdSet is the largest filedescriptor in mFdSet or -1 if it is empty.
static size_t const MAXSIZE = std::max(1024, FD_SETSIZE);
// Create an empty PollSet.
PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [std::max(1024, FD_SETSIZE)]), mSize(std::max(1024, FD_SETSIZE)),
PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [MAXSIZE]),
mNrFds(0), mMaxFd(-1), mNext(0), mMaxFdSet(-1)
{
FD_ZERO(&mFdSet);
@@ -142,7 +144,7 @@ PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [std::max(1024, FD_S
// Add filedescriptor s to the PollSet.
void PollSet::add(curl_socket_t s)
{
llassert_always(mNrFds < mSize);
llassert_always(mNrFds < MAXSIZE);
mFileDescriptors[mNrFds++] = s;
mMaxFd = std::max(mMaxFd, s);
}
@@ -159,17 +161,57 @@ void PollSet::remove(curl_socket_t s)
// much clock cycles. Therefore, shift the whole vector
// back, keeping it compact and keeping the filedescriptors
// in the same order (which is supposedly their priority).
//
// The general case is where mFileDescriptors contains s at an index
// between 0 and mNrFds:
// mNrFds = 6
// v
// index: 0 1 2 3 4 5
// a b c s d e
// This function should never be called unless s is actually in mFileDescriptors,
// as a result of a previous call to PollSet::add().
llassert(mNrFds > 0);
// Correct mNrFds for when the descriptor is removed.
// Make i 'point' to the last entry.
int i = --mNrFds;
int prev = mFileDescriptors[i];
// i = NrFds = 5
// v
// index: 0 1 2 3 4 5
// a b c s d e
int prev = mFileDescriptors[i]; // prev = 'e'
int max = -1;
for (--i; i >= 0 && prev != s; --i)
{
int cur = mFileDescriptors[i];
mFileDescriptors[i] = prev;
max = std::max(max, prev);
prev = cur;
int cur = mFileDescriptors[i]; // cur = 'd'
mFileDescriptors[i] = prev; // Overwrite 'd' with 'e'.
max = std::max(max, prev); // max is the maximum value in 'i' or higher.
prev = cur; // prev = 'd'
// i NrFds = 5
// v v
// index: 0 1 2 3 4
// a b c s e // prev = 'd'
//
// Next loop iteration: cur = 's', overwrite 's' with 'd', prev = 's'; loop terminates.
// i NrFds = 5
// v v
// index: 0 1 2 3 4
// a b c d e // prev = 's'
}
llassert(prev == s);
// At this point i was decremented once more and points to the element before the old s.
// i NrFds = 5
// v v
// index: 0 1 2 3 4
// a b c d e // max = std::max('d', 'e')
// If mNext pointed to an element before s, it should be left alone. Otherwise, if mNext pointed
// to s it must now point to 'd', or if it pointed beyond 's' it must be decremented by 1.
if (mNext > i + 1) // i + 1 is where s was.
--mNext;
// If s was the largest file descriptor, we have to update mMaxFd.
if (s == mMaxFd)
{
while (i >= 0)
@@ -182,8 +224,10 @@ void PollSet::remove(curl_socket_t s)
llassert(mMaxFd < s);
llassert((mMaxFd == -1) == (mNrFds == 0));
}
if (mNext == mNrFds)
mNext = 0;
// ALSO make sure that s is no longer set in mFdSet, or we might confuse libcurl by
// calling curl_multi_socket_action for a socket that it told us to remove.
clr(s);
}
bool PollSet::contains(curl_socket_t fd) const
@@ -220,7 +264,7 @@ refresh_t PollSet::refresh(void)
llassert_always(mNext < mNrFds);
// Test if mNrFds is larger than or equal FD_SETSIZE; equal, because we reserve one
// Test if mNrFds is larger than or equal to FD_SETSIZE; equal, because we reserve one
// filedescriptor for the wakeup fd: we copy maximal FD_SETSIZE - 1 filedescriptors.
// If not then we're going to copy everything so that we can save on CPU cycles
// by not calculating mMaxFdSet here.
@@ -228,22 +272,9 @@ refresh_t PollSet::refresh(void)
{
llwarns << "PollSet::reset: More than FD_SETSIZE (" << FD_SETSIZE << ") file descriptors active!" << llendl;
// Calculate mMaxFdSet.
int max = -1;
int count = 0;
int end = mNrFds;
int i = mNext;
while (++count < FD_SETSIZE)
{
max = std::max(max, mFileDescriptors[i]);
if (++i == end)
{
if (end == mNext)
break;
end = mNext;
i = 0;
llassert(i < end); // If mNext == 0 then the while loop terminates before we get here.
}
}
// Run over FD_SETSIZE - 1 elements, starting at mNext, wrapping to 0 when we reach the end.
int max = -1, i = mNext, count = 0;
while (++count < FD_SETSIZE) { max = std::max(max, mFileDescriptors[i]); if (++i == mNrFds) i = 0; }
mMaxFdSet = max;
}
else
@@ -252,7 +283,6 @@ refresh_t PollSet::refresh(void)
mMaxFdSet = mMaxFd;
}
int count = 0;
int end = mNrFds;
int i = mNext;
for(;;)
{
@@ -263,15 +293,13 @@ refresh_t PollSet::refresh(void)
}
FD_SET(mFileDescriptors[i], &mFdSet);
mCopiedFileDescriptors.push_back(mFileDescriptors[i]);
if (++i == end)
if (++i == mNrFds)
{
// If we reached the end and start at the beginning, then we copied everything.
if (mNext == 0)
break;
// If this was true then we got here a second time, which means that we accessed all
// filedescriptors with mNext != 0, but count is still < FD_SETSIZE, which is not
// possible because mNext is only non-zero when mNrFds >= FD_SETSIZE.
llassert(end != mNext);
end = mNext;
// When can only come here if mNrFds >= FD_SETSIZE, hence we can just
// wrap around and terminate on count reaching FD_SETSIZE.
i = 0;
}
}
@@ -280,9 +308,29 @@ refresh_t PollSet::refresh(void)
// FIXME: This needs a rewrite on Windows, as FD_ISSET is slow there; it would make
// more sense to iterate directly over the fd's in mFdSet on Windows.
//
// The API reset(), get() and next() allows one to run over all filedescriptors
// in mFdSet that are set. This works by running only over the filedescriptors
// that were set initially (by the call to refresh()) and then checking if that
// filedescriptor is (still) set in mFdSet.
//
// A call to reset() resets mIter to the beginning, so that get() returns
// the first filedescriptor that is still set. A call to next() progresses
// the iterator to the next set filedescriptor. If get() return -1, then there
// were no more filedescriptors set.
//
// Note that one should never call next() unless get() didn't return -1, so
// the call sequence is:
// refresh();
// /* reset some or all bits in mFdSet */
// reset();
// while (get() != -1) // next();
//
// Note also that this API is only used by MergeIterator, which wraps it
// and provides a different API to use.
void PollSet::reset(void)
{
llassert((mNrFds == 0) == mCopiedFileDescriptors.empty());
if (mCopiedFileDescriptors.empty())
mIter = mCopiedFileDescriptors.end();
else
@@ -298,8 +346,6 @@ inline int PollSet::get(void) const
return (mIter == mCopiedFileDescriptors.end()) ? -1 : *mIter;
}
// FIXME: This needs a rewrite on Windows, as FD_ISSET is slow there; it would make
// more sense to iterate directly over the fd's in mFdSet on Windows.
void PollSet::next(void)
{
llassert(mIter != mCopiedFileDescriptors.end()); // Only call next() if the last call to get() didn't return -1.
@@ -308,6 +354,10 @@ void PollSet::next(void)
//-----------------------------------------------------------------------------
// MergeIterator
//
// This class takes two PollSet's and allows one to run over all filedescriptors
// that are set in one or both poll sets, returning each filedescriptor only
// once, by calling next() until it returns false.
class MergeIterator
{
@@ -343,7 +393,6 @@ bool MergeIterator::next(int& fd_out, int& ev_bitmask_out)
fd_out = rfd;
ev_bitmask_out = CURL_CSELECT_IN | CURL_CSELECT_OUT;
mReadPollSet.next();
}
else if ((unsigned int)rfd < (unsigned int)wfd) // Use and increment smaller one, unless it's -1.
{
@@ -440,6 +489,9 @@ class AICurlThread : public LLThread
// MAIN-THREAD
void wakeup_thread(void);
// MAIN-THREAD
void stop_thread(void) { mRunning = false; wakeup_thread(); }
protected:
virtual void run(void);
void wakeup(AICurlMultiHandle_wat const& multi_handle_w);
@@ -453,13 +505,15 @@ class AICurlThread : public LLThread
int mWakeUpFd;
int mZeroTimeOut;
volatile bool mRunning;
};
// Only the main thread is accessing this.
AICurlThread* AICurlThread::sInstance = NULL;
// MAIN-THREAD
AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(-1), mWakeUpFd(-1), mZeroTimeOut(0)
AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(-1), mWakeUpFd(-1), mZeroTimeOut(0), mRunning(true)
{
create_wakeup_fds();
sInstance = this;
@@ -567,7 +621,7 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w)
llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl;
close(mWakeUpFd);
mWakeUpFd = -1;
shutdown();
mRunning = false;
return;
}
#endif
@@ -610,30 +664,29 @@ void AICurlThread::run(void)
{
DoutEntering(dc::curl, "AICurlThread::run()");
AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance());
for(;;)
{
refresh_t rres = multi_handle_w->mReadPollSet.refresh();
refresh_t wres = multi_handle_w->mWritePollSet.refresh();
fd_set* read_fd_set = multi_handle_w->mReadPollSet.access();
if (LL_LIKELY(mWakeUpFd != -1))
FD_SET(mWakeUpFd, read_fd_set);
else if ((rres & empty))
read_fd_set = NULL;
fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access();
int const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd);
int const max_wfd = multi_handle_w->mWritePollSet.get_max_fd();
int nfds = std::max(max_rfd, max_wfd) + 1;
llassert(0 <= nfds && nfds <= FD_SETSIZE);
llassert((max_rfd == -1) == (read_fd_set == NULL) &&
(max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows.
llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) &&
(max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd)));
int ready = 0;
if (LL_UNLIKELY(nfds == 0)) // Only happens when the thread is shutting down.
ms_sleep(1000);
else
AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance());
while(mRunning)
{
// If mRunning is true then we can only get here if mWakeUpFd != -1.
llassert(mWakeUpFd != -1);
// Copy the next batch of file descriptors from the PollSets mFiledescriptors into their mFdSet.
multi_handle_w->mReadPollSet.refresh();
refresh_t wres = multi_handle_w->mWritePollSet.refresh();
// Add wake up fd if any, and pass NULL to select() if a set is empty.
fd_set* read_fd_set = multi_handle_w->mReadPollSet.access();
FD_SET(mWakeUpFd, read_fd_set);
fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access();
// Calculate nfds (ignored on windows).
int const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd);
int const max_wfd = multi_handle_w->mWritePollSet.get_max_fd();
int nfds = std::max(max_rfd, max_wfd) + 1;
llassert(0 <= nfds && nfds <= FD_SETSIZE);
llassert((max_rfd == -1) == (read_fd_set == NULL) &&
(max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows.
llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) &&
(max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd)));
int ready = 0;
struct timeval timeout;
long timeout_ms = multi_handle_w->getTimeOut();
// If no timeout is set, sleep 1 second.
@@ -695,37 +748,41 @@ void AICurlThread::run(void)
if (ready == -1)
last_errno = errno;
#endif
}
// Select returns the total number of bits set in each of the fd_set's (upon return),
// or -1 when an error occurred. A value of 0 means that a timeout occurred.
if (ready == -1)
{
llwarns << "select() failed: " << errno << ", " << strerror(errno) << llendl;
continue;
}
else if (ready == 0)
{
multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0);
}
else
{
if (multi_handle_w->mReadPollSet.is_set(mWakeUpFd))
// Select returns the total number of bits set in each of the fd_set's (upon return),
// or -1 when an error occurred. A value of 0 means that a timeout occurred.
if (ready == -1)
{
wakeup(multi_handle_w);
--ready;
llwarns << "select() failed: " << errno << ", " << strerror(errno) << llendl;
continue;
}
MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet);
int fd, ev_bitmask;
while (ready > 0 && iter.next(fd, ev_bitmask))
else if (ready == 0)
{
ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1;
multi_handle_w->socket_action(fd, ev_bitmask);
llassert(ready >= 0);
multi_handle_w->socket_action(CURL_SOCKET_TIMEOUT, 0);
}
llassert(ready == 0);
else
{
if (multi_handle_w->mReadPollSet.is_set(mWakeUpFd))
{
// Process commands from main-thread. This can add or remove filedescriptors from the poll sets.
wakeup(multi_handle_w);
--ready;
}
// Handle all active filedescriptors.
MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet);
int fd, ev_bitmask;
while (ready > 0 && iter.next(fd, ev_bitmask))
{
ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1;
multi_handle_w->socket_action(fd, ev_bitmask);
llassert(ready >= 0);
}
// Should have handled them all.
llassert(ready == 0);
}
multi_handle_w->check_run_count();
}
multi_handle_w->check_run_count();
}
AICurlMultiHandle::destroyInstance();
}
//-----------------------------------------------------------------------------
@@ -733,10 +790,10 @@ void AICurlThread::run(void)
MultiHandle::MultiHandle(void) : mHandleAddedOrRemoved(false), mPrevRunningHandles(0), mRunningHandles(0), mTimeOut(-1)
{
curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETFUNCTION, &MultiHandle::socket_callback);
curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETDATA, this);
curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERFUNCTION, &MultiHandle::timer_callback);
curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERDATA, this);
check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETFUNCTION, &MultiHandle::socket_callback));
check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_SOCKETDATA, this));
check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERFUNCTION, &MultiHandle::timer_callback));
check_multi_code(curl_multi_setopt(mMultiHandle, CURLMOPT_TIMERDATA, this));
}
MultiHandle::~MultiHandle()
@@ -818,7 +875,13 @@ CURLMcode MultiHandle::assign(curl_socket_t sockfd, void* sockptr)
CURLMsg const* MultiHandle::info_read(int* msgs_in_queue) const
{
return curl_multi_info_read(mMultiHandle, msgs_in_queue);
CURLMsg const* ret = curl_multi_info_read(mMultiHandle, msgs_in_queue);
// NULL could be an error, but normally it isn't, so don't print anything and
// never increment Stats::multi_errors. However, lets just increment multi_calls
// when it certainly wasn't an error...
if (ret)
Stats::multi_calls++;
return ret;
}
CURLMcode MultiHandle::add_easy_request(AICurlEasyRequest const& easy_request)
@@ -863,7 +926,8 @@ void MultiHandle::check_run_count(void)
{
CURL* easy = msg->easy_handle;
ThreadSafeCurlEasyRequest* ptr;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr);
CURLcode rese = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ptr);
llassert_always(rese == CURLE_OK);
AICurlEasyRequest easy_request(ptr);
llassert(*AICurlEasyRequest_wat(*easy_request) == easy);
// Store the result and transfer info in the easy handle.
@@ -904,6 +968,15 @@ void MultiHandle::check_run_count(void)
} // namespace curlthread
} // namespace AICurlPrivate
//static
void AICurlMultiHandle::destroyInstance(void)
{
LLThreadLocalData& tldata = LLThreadLocalData::tldata();
Dout(dc::curl, "Destroying AICurlMultiHandle [" << (void*)tldata.mCurlMultiHandle << "] for thread \"" << tldata.mName << "\".");
delete tldata.mCurlMultiHandle;
tldata.mCurlMultiHandle = NULL;
}
//=============================================================================
// MAIN-THREAD (needing to access the above declarations).
@@ -913,8 +986,8 @@ AICurlMultiHandle& AICurlMultiHandle::getInstance(void)
LLThreadLocalData& tldata = LLThreadLocalData::tldata();
if (!tldata.mCurlMultiHandle)
{
llinfos << "Creating AICurlMultiHandle for thread \"" << tldata.mName << "\"." << llendl;
tldata.mCurlMultiHandle = new AICurlMultiHandle;
Dout(dc::curl, "Created AICurlMultiHandle [" << (void*)tldata.mCurlMultiHandle << "] for thread \"" << tldata.mName << "\".");
}
return *static_cast<AICurlMultiHandle*>(tldata.mCurlMultiHandle);
}
@@ -934,6 +1007,21 @@ void wakeUpCurlThread(void)
AICurlThread::sInstance->wakeup_thread();
}
void stopCurlThread(void)
{
using curlthread::AICurlThread;
if (AICurlThread::sInstance)
{
AICurlThread::sInstance->stop_thread();
int count = 101;
while(--count && !AICurlThread::sInstance->isStopped())
{
ms_sleep(10);
}
Dout(dc::curl, "Curl thread" << (curlThreadIsRunning() ? " not" : "") << " stopped after " << ((100 - count) * 10) << "ms.");
}
}
} // namespace AICurlPrivate
//-----------------------------------------------------------------------------
@@ -994,7 +1082,7 @@ void AICurlEasyRequest::removeRequest(void)
{
// Write-lock the command queue.
command_queue_wat command_queue_w(command_queue);
command_queue_wat command_queue_w(command_queue);
#ifdef SHOW_ASSERT
// This debug code checks if we aren't calling removeRequest() twice for the same object.
// That means that the thread calling this function already finished it, following from that