Added some windows code.

Iterating directly over the elements of fd_set::fd_array in
windows is faster than using FD_ISSET.
This commit is contained in:
Aleric Inglewood
2012-07-04 07:32:24 +02:00
parent 125a10bb44
commit 07e7eeedd1
2 changed files with 117 additions and 53 deletions

View File

@@ -136,7 +136,10 @@ static size_t const MAXSIZE = std::max(1024, FD_SETSIZE);
// Create an empty PollSet. // Create an empty PollSet.
PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [MAXSIZE]), PollSet::PollSet(void) : mFileDescriptors(new curl_socket_t [MAXSIZE]),
mNrFds(0), mMaxFd(-1), mNext(0), mMaxFdSet(-1) mNrFds(0), mNext(0)
#if !LL_WINDOWS
, mMaxFd(-1), mMaxFdSet(-1)
#endif
{ {
FD_ZERO(&mFdSet); FD_ZERO(&mFdSet);
} }
@@ -146,7 +149,9 @@ void PollSet::add(curl_socket_t s)
{ {
llassert_always(mNrFds < MAXSIZE); llassert_always(mNrFds < MAXSIZE);
mFileDescriptors[mNrFds++] = s; mFileDescriptors[mNrFds++] = s;
#if !LL_WINDOWS
mMaxFd = std::max(mMaxFd, s); mMaxFd = std::max(mMaxFd, s);
#endif
} }
// Remove filedescriptor s from the PollSet. // Remove filedescriptor s from the PollSet.
@@ -180,54 +185,82 @@ void PollSet::remove(curl_socket_t s)
// v // v
// index: 0 1 2 3 4 5 // index: 0 1 2 3 4 5
// a b c s d e // a b c s d e
int prev = mFileDescriptors[i]; // prev = 'e' curl_socket_t cur = mFileDescriptors[i]; // cur = 'e'
int max = -1; #if !LL_WINDOWS
for (--i; i >= 0 && prev != s; --i) curl_socket_t max = -1;
#endif
while (cur != s)
{ {
int cur = mFileDescriptors[i]; // cur = 'd' llassert(i > 0);
mFileDescriptors[i] = prev; // Overwrite 'd' with 'e'. curl_socket_t next = mFileDescriptors[--i]; // next = 'd'
max = std::max(max, prev); // max is the maximum value in 'i' or higher. mFileDescriptors[i] = cur; // Overwrite 'd' with 'e'.
prev = cur; // prev = 'd' #if !LL_WINDOWS
max = std::max(max, cur); // max is the maximum value in 'i' or higher.
#endif
cur = next; // cur = 'd'
// i NrFds = 5 // i NrFds = 5
// v v // v v
// index: 0 1 2 3 4 // index: 0 1 2 3 4
// a b c s e // prev = 'd' // a b c s e // cur = 'd'
// //
// Next loop iteration: cur = 's', overwrite 's' with 'd', prev = 's'; loop terminates. // Next loop iteration: next = 's', overwrite 's' with 'd', cur = 's'; loop terminates.
// i NrFds = 5 // i NrFds = 5
// v v // v v
// index: 0 1 2 3 4 // index: 0 1 2 3 4
// a b c d e // prev = 's' // a b c d e // cur = 's'
} }
llassert(prev == s); llassert(cur == s);
// At this point i was decremented once more and points to the element before the old s. // At this point i was decremented once more and points to the element before the old s.
// i NrFds = 5 // i NrFds = 5
// v v // v v
// index: 0 1 2 3 4 // index: 0 1 2 3 4
// a b c d e // max = std::max('d', 'e') // a b c d e // max = std::max('d', 'e')
// If mNext pointed to an element before s, it should be left alone. Otherwise, if mNext pointed // If mNext pointed to an element before s, it should be left alone. Otherwise, if mNext pointed
// to s it must now point to 'd', or if it pointed beyond 's' it must be decremented by 1. // to s it must now point to 'd', or if it pointed beyond 's' it must be decremented by 1.
if (mNext > i + 1) // i + 1 is where s was. if (mNext > i) // i is where s was.
--mNext; --mNext;
#if !LL_WINDOWS
// If s was the largest file descriptor, we have to update mMaxFd. // If s was the largest file descriptor, we have to update mMaxFd.
if (s == mMaxFd) if (s == mMaxFd)
{ {
while (i >= 0) while (i > 0)
{ {
int cur = mFileDescriptors[i]; curl_socket_t next = mFileDescriptors[--i];
max = std::max(max, cur); max = std::max(max, next);
--i;
} }
mMaxFd = max; mMaxFd = max;
llassert(mMaxFd < s); llassert(mMaxFd < s);
llassert((mMaxFd == -1) == (mNrFds == 0)); llassert((mMaxFd == -1) == (mNrFds == 0));
} }
#endif
// ALSO make sure that s is no longer set in mFdSet, or we might confuse libcurl by // ALSO make sure that s is no longer set in mFdSet, or we might confuse libcurl by
// calling curl_multi_socket_action for a socket that it told us to remove. // calling curl_multi_socket_action for a socket that it told us to remove.
#if !LL_WINDOWS
clr(s); clr(s);
#else
// We have to use a custom implementation here, because we don't want to invalidate mIter.
// This is the same algorithm as above, but with mFdSet.fd_count instead of mNrFds,
// mFdSet.fd_array instead of mFileDescriptors and mIter instead of mNext.
if (FD_ISSET(s, &mFdSet))
{
int i = --mFdSet.fd_count;
llassert(i >= 0);
curl_socket_t cur = mFdSet.fd_array[i];
while (cur != s)
{
llassert(i > 0);
curl_socket_t next = mFileDescriptors[--i];
mFileDescriptors[i] = cur;
cur = next;
}
if (mIter > i)
--mIter;
llassert(mIter <= mFdSet.fd_count);
}
#endif
} }
bool PollSet::contains(curl_socket_t fd) const bool PollSet::contains(curl_socket_t fd) const
@@ -254,11 +287,15 @@ inline void PollSet::clr(curl_socket_t fd)
refresh_t PollSet::refresh(void) refresh_t PollSet::refresh(void)
{ {
FD_ZERO(&mFdSet); FD_ZERO(&mFdSet);
#if !LL_WINDOWS
mCopiedFileDescriptors.clear(); mCopiedFileDescriptors.clear();
#endif
if (mNrFds == 0) if (mNrFds == 0)
{ {
#if !LL_WINDOWS
mMaxFdSet = -1; mMaxFdSet = -1;
#endif
return empty_and_complete; return empty_and_complete;
} }
@@ -271,16 +308,20 @@ refresh_t PollSet::refresh(void)
if (mNrFds >= FD_SETSIZE) if (mNrFds >= FD_SETSIZE)
{ {
llwarns << "PollSet::reset: More than FD_SETSIZE (" << FD_SETSIZE << ") file descriptors active!" << llendl; llwarns << "PollSet::reset: More than FD_SETSIZE (" << FD_SETSIZE << ") file descriptors active!" << llendl;
#if !LL_WINDOWS
// Calculate mMaxFdSet. // Calculate mMaxFdSet.
// Run over FD_SETSIZE - 1 elements, starting at mNext, wrapping to 0 when we reach the end. // Run over FD_SETSIZE - 1 elements, starting at mNext, wrapping to 0 when we reach the end.
int max = -1, i = mNext, count = 0; int max = -1, i = mNext, count = 0;
while (++count < FD_SETSIZE) { max = std::max(max, mFileDescriptors[i]); if (++i == mNrFds) i = 0; } while (++count < FD_SETSIZE) { max = std::max(max, mFileDescriptors[i]); if (++i == mNrFds) i = 0; }
mMaxFdSet = max; mMaxFdSet = max;
#endif
} }
else else
{ {
mNext = 0; // Start at the beginning if we copy everything anyway. mNext = 0; // Start at the beginning if we copy everything anyway.
#if !LL_WINDOWS
mMaxFdSet = mMaxFd; mMaxFdSet = mMaxFd;
#endif
} }
int count = 0; int count = 0;
int i = mNext; int i = mNext;
@@ -292,7 +333,9 @@ refresh_t PollSet::refresh(void)
return not_complete_not_empty; return not_complete_not_empty;
} }
FD_SET(mFileDescriptors[i], &mFdSet); FD_SET(mFileDescriptors[i], &mFdSet);
#if !LL_WINDOWS
mCopiedFileDescriptors.push_back(mFileDescriptors[i]); mCopiedFileDescriptors.push_back(mFileDescriptors[i]);
#endif
if (++i == mNrFds) if (++i == mNrFds)
{ {
// If we reached the end and start at the beginning, then we copied everything. // If we reached the end and start at the beginning, then we copied everything.
@@ -306,9 +349,6 @@ refresh_t PollSet::refresh(void)
return complete_not_empty; return complete_not_empty;
} }
// FIXME: This needs a rewrite on Windows, as FD_ISSET is slow there; it would make
// more sense to iterate directly over the fd's in mFdSet on Windows.
//
// The API reset(), get() and next() allows one to run over all filedescriptors // The API reset(), get() and next() allows one to run over all filedescriptors
// in mFdSet that are set. This works by running only over the filedescriptors // in mFdSet that are set. This works by running only over the filedescriptors
// that were set initially (by the call to refresh()) and then checking if that // that were set initially (by the call to refresh()) and then checking if that
@@ -324,13 +364,16 @@ refresh_t PollSet::refresh(void)
// refresh(); // refresh();
// /* reset some or all bits in mFdSet */ // /* reset some or all bits in mFdSet */
// reset(); // reset();
// while (get() != -1) // next(); // while (get() != CURL_SOCKET_BAD) // next();
// //
// Note also that this API is only used by MergeIterator, which wraps it // Note also that this API is only used by MergeIterator, which wraps it
// and provides a different API to use. // and provides a different API to use.
void PollSet::reset(void) void PollSet::reset(void)
{ {
#if LL_WINDOWS
mIter = 0;
#else
if (mCopiedFileDescriptors.empty()) if (mCopiedFileDescriptors.empty())
mIter = mCopiedFileDescriptors.end(); mIter = mCopiedFileDescriptors.end();
else else
@@ -339,17 +382,27 @@ void PollSet::reset(void)
if (!FD_ISSET(*mIter, &mFdSet)) if (!FD_ISSET(*mIter, &mFdSet))
next(); next();
} }
#endif
} }
inline int PollSet::get(void) const inline curl_socket_t PollSet::get(void) const
{ {
return (mIter == mCopiedFileDescriptors.end()) ? -1 : *mIter; #if LL_WINDOWS
return (mIter >= mFdSet.fd_count) ? CURL_SOCKET_BAD : mFdSet.fd_array[mIter];
#else
return (mIter == mCopiedFileDescriptors.end()) ? CURL_SOCKET_BAD : *mIter;
#endif
} }
void PollSet::next(void) void PollSet::next(void)
{ {
#if LL_WINDOWS
llassert(mIter < mFdSet.fd_count);
++mIter;
#else
llassert(mIter != mCopiedFileDescriptors.end()); // Only call next() if the last call to get() didn't return -1. llassert(mIter != mCopiedFileDescriptors.end()); // Only call next() if the last call to get() didn't return -1.
while (++mIter != mCopiedFileDescriptors.end() && !FD_ISSET(*mIter, &mFdSet)); while (++mIter != mCopiedFileDescriptors.end() && !FD_ISSET(*mIter, &mFdSet));
#endif
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@@ -364,7 +417,7 @@ class MergeIterator
public: public:
MergeIterator(PollSet& readPollSet, PollSet& writePollSet); MergeIterator(PollSet& readPollSet, PollSet& writePollSet);
bool next(int& fd_out, int& ev_bitmask_out); bool next(curl_socket_t& fd_out, int& ev_bitmask_out);
private: private:
PollSet& mReadPollSet; PollSet& mReadPollSet;
@@ -380,12 +433,12 @@ MergeIterator::MergeIterator(PollSet& readPollSet, PollSet& writePollSet) :
mWritePollSet.reset(); mWritePollSet.reset();
} }
bool MergeIterator::next(int& fd_out, int& ev_bitmask_out) bool MergeIterator::next(curl_socket_t& fd_out, int& ev_bitmask_out)
{ {
int rfd = mReadPollSet.get(); curl_socket_t rfd = mReadPollSet.get();
int wfd = mWritePollSet.get(); curl_socket_t wfd = mWritePollSet.get();
if (rfd == -1 && wfd == -1) if (rfd == CURL_SOCKET_BAD && wfd == CURL_SOCKET_BAD)
return false; return false;
if (rfd == wfd) if (rfd == wfd)
@@ -394,12 +447,12 @@ bool MergeIterator::next(int& fd_out, int& ev_bitmask_out)
ev_bitmask_out = CURL_CSELECT_IN | CURL_CSELECT_OUT; ev_bitmask_out = CURL_CSELECT_IN | CURL_CSELECT_OUT;
mReadPollSet.next(); mReadPollSet.next();
} }
else if ((unsigned int)rfd < (unsigned int)wfd) // Use and increment smaller one, unless it's -1. else if (wfd == CURL_SOCKET_BAD || rfd < wfd) // Use and increment smaller one, unless it's CURL_SOCKET_BAD.
{ {
fd_out = rfd; fd_out = rfd;
ev_bitmask_out = CURL_CSELECT_IN; ev_bitmask_out = CURL_CSELECT_IN;
mReadPollSet.next(); mReadPollSet.next();
if (wfd != -1 && mWritePollSet.is_set(rfd)) if (wfd != CURL_SOCKET_BAD && mWritePollSet.is_set(rfd))
{ {
ev_bitmask_out |= CURL_CSELECT_OUT; ev_bitmask_out |= CURL_CSELECT_OUT;
mWritePollSet.clr(rfd); mWritePollSet.clr(rfd);
@@ -410,7 +463,7 @@ bool MergeIterator::next(int& fd_out, int& ev_bitmask_out)
fd_out = wfd; fd_out = wfd;
ev_bitmask_out = CURL_CSELECT_OUT; ev_bitmask_out = CURL_CSELECT_OUT;
mWritePollSet.next(); mWritePollSet.next();
if (rfd != -1 && mReadPollSet.is_set(wfd)) if (rfd != CURL_SOCKET_BAD && mReadPollSet.is_set(wfd))
{ {
ev_bitmask_out |= CURL_CSELECT_IN; ev_bitmask_out |= CURL_CSELECT_IN;
mReadPollSet.clr(wfd); mReadPollSet.clr(wfd);
@@ -501,8 +554,8 @@ class AICurlThread : public LLThread
void create_wakeup_fds(void); void create_wakeup_fds(void);
void cleanup_wakeup_fds(void); void cleanup_wakeup_fds(void);
int mWakeUpFd_in; curl_socket_t mWakeUpFd_in;
int mWakeUpFd; curl_socket_t mWakeUpFd;
int mZeroTimeOut; int mZeroTimeOut;
@@ -513,7 +566,7 @@ class AICurlThread : public LLThread
AICurlThread* AICurlThread::sInstance = NULL; AICurlThread* AICurlThread::sInstance = NULL;
// MAIN-THREAD // MAIN-THREAD
AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(-1), mWakeUpFd(-1), mZeroTimeOut(0), mRunning(true) AICurlThread::AICurlThread(void) : LLThread("AICurlThread"), mWakeUpFd_in(CURL_SOCKET_BAD), mWakeUpFd(CURL_SOCKET_BAD), mZeroTimeOut(0), mRunning(true)
{ {
create_wakeup_fds(); create_wakeup_fds();
sInstance = this; sInstance = this;
@@ -554,9 +607,9 @@ void AICurlThread::create_wakeup_fds(void)
// MAIN-THREAD // MAIN-THREAD
void AICurlThread::cleanup_wakeup_fds(void) void AICurlThread::cleanup_wakeup_fds(void)
{ {
if (mWakeUpFd_in != -1) if (mWakeUpFd_in != CURL_SOCKET_BAD)
close(mWakeUpFd_in); close(mWakeUpFd_in);
if (mWakeUpFd != -1) if (mWakeUpFd != CURL_SOCKET_BAD)
close(mWakeUpFd); close(mWakeUpFd);
} }
@@ -620,7 +673,7 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w)
{ {
llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl; llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl;
close(mWakeUpFd); close(mWakeUpFd);
mWakeUpFd = -1; mWakeUpFd = CURL_SOCKET_BAD;
mRunning = false; mRunning = false;
return; return;
} }
@@ -668,8 +721,8 @@ void AICurlThread::run(void)
AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance()); AICurlMultiHandle_wat multi_handle_w(AICurlMultiHandle::getInstance());
while(mRunning) while(mRunning)
{ {
// If mRunning is true then we can only get here if mWakeUpFd != -1. // If mRunning is true then we can only get here if mWakeUpFd != CURL_SOCKET_BAD.
llassert(mWakeUpFd != -1); llassert(mWakeUpFd != CURL_SOCKET_BAD);
// Copy the next batch of file descriptors from the PollSets mFiledescriptors into their mFdSet. // Copy the next batch of file descriptors from the PollSets mFiledescriptors into their mFdSet.
multi_handle_w->mReadPollSet.refresh(); multi_handle_w->mReadPollSet.refresh();
refresh_t wres = multi_handle_w->mWritePollSet.refresh(); refresh_t wres = multi_handle_w->mWritePollSet.refresh();
@@ -678,14 +731,18 @@ void AICurlThread::run(void)
FD_SET(mWakeUpFd, read_fd_set); FD_SET(mWakeUpFd, read_fd_set);
fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access(); fd_set* write_fd_set = ((wres & empty)) ? NULL : multi_handle_w->mWritePollSet.access();
// Calculate nfds (ignored on windows). // Calculate nfds (ignored on windows).
int const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd); #if !LL_WINDOWS
int const max_wfd = multi_handle_w->mWritePollSet.get_max_fd(); curl_socket_t const max_rfd = std::max(multi_handle_w->mReadPollSet.get_max_fd(), mWakeUpFd);
curl_socket_t const max_wfd = multi_handle_w->mWritePollSet.get_max_fd();
int nfds = std::max(max_rfd, max_wfd) + 1; int nfds = std::max(max_rfd, max_wfd) + 1;
llassert(0 <= nfds && nfds <= FD_SETSIZE); llassert(0 <= nfds && nfds <= FD_SETSIZE);
llassert((max_rfd == -1) == (read_fd_set == NULL) && llassert((max_rfd == -1) == (read_fd_set == NULL) &&
(max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows. (max_wfd == -1) == (write_fd_set == NULL)); // Needed on Windows.
llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) && llassert((max_rfd == -1 || multi_handle_w->mReadPollSet.is_set(max_rfd)) &&
(max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd))); (max_wfd == -1 || multi_handle_w->mWritePollSet.is_set(max_wfd)));
#else
int nfds = 64;
#endif
int ready = 0; int ready = 0;
struct timeval timeout; struct timeval timeout;
long timeout_ms = multi_handle_w->getTimeOut(); long timeout_ms = multi_handle_w->getTimeOut();
@@ -769,7 +826,8 @@ void AICurlThread::run(void)
} }
// Handle all active filedescriptors. // Handle all active filedescriptors.
MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet); MergeIterator iter(multi_handle_w->mReadPollSet, multi_handle_w->mWritePollSet);
int fd, ev_bitmask; curl_socket_t fd;
int ev_bitmask;
while (ready > 0 && iter.next(fd, ev_bitmask)) while (ready > 0 && iter.next(fd, ev_bitmask))
{ {
ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1; ready -= (ev_bitmask == (CURL_CSELECT_IN|CURL_CSELECT_OUT)) ? 2 : 1;

View File

@@ -72,8 +72,10 @@ class PollSet
// Return a pointer to the underlaying fd_set. // Return a pointer to the underlaying fd_set.
fd_set* access(void) { return &mFdSet; } fd_set* access(void) { return &mFdSet; }
#if !LL_WINDOWS
// Return the largest fd set in mFdSet by refresh. // Return the largest fd set in mFdSet by refresh.
int get_max_fd(void) const { return mMaxFdSet; } curl_socket_t get_max_fd(void) const { return mMaxFdSet; }
#endif
// Return true if a filedescriptor is set in mFileDescriptors (used for debugging). // Return true if a filedescriptor is set in mFileDescriptors (used for debugging).
bool contains(curl_socket_t s) const; bool contains(curl_socket_t s) const;
@@ -85,22 +87,26 @@ class PollSet
void clr(curl_socket_t fd); void clr(curl_socket_t fd);
// Iterate over all file descriptors that were set by refresh and are still set in mFdSet. // Iterate over all file descriptors that were set by refresh and are still set in mFdSet.
void reset(void); // Reset the iterator. void reset(void); // Reset the iterator.
int get(void) const; // Return next filedescriptor, or -1 when there are no more. curl_socket_t get(void) const; // Return next filedescriptor, or CURL_SOCKET_BAD when there are no more.
// Only valid if reset() was called after the last call to refresh(). // Only valid if reset() was called after the last call to refresh().
void next(void); // Advance to next filedescriptor. void next(void); // Advance to next filedescriptor.
private: private:
curl_socket_t* mFileDescriptors; curl_socket_t* mFileDescriptors;
int mNrFds; // The number of filedescriptors in the array. int mNrFds; // The number of filedescriptors in the array.
int mMaxFd; // The largest filedescriptor in the array, or -1 when it is empty. int mNext; // The index of the first file descriptor to start copying, the next call to refresh().
int mNext; // The index of the first file descriptor to start copying, the next call to refresh().
fd_set mFdSet; // Output variable for select(). (Re)initialized by calling refresh(). fd_set mFdSet; // Output variable for select(). (Re)initialized by calling refresh().
int mMaxFdSet; // The largest filedescriptor set in mFdSet by refresh(), or -1 when it was empty.
#if !LL_WINDOWS
curl_socket_t mMaxFd; // The largest filedescriptor in the array, or CURL_SOCKET_BAD when it is empty.
curl_socket_t mMaxFdSet; // The largest filedescriptor set in mFdSet by refresh(), or CURL_SOCKET_BAD when it was empty.
std::vector<curl_socket_t> mCopiedFileDescriptors; // Filedescriptors copied by refresh to mFdSet. std::vector<curl_socket_t> mCopiedFileDescriptors; // Filedescriptors copied by refresh to mFdSet.
std::vector<curl_socket_t>::iterator mIter; // Index into mCopiedFileDescriptors for next(); loop variable. std::vector<curl_socket_t>::iterator mIter; // Index into mCopiedFileDescriptors for next(); loop variable.
#else
int mIter; // Index into fd_set::fd_array.
#endif
}; };
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------