Note that in the code, and still, has_curl_request was always false. However, instead of deleting all code paths that are only executed when has_curl_request would be true, I fixed the code to work as intended with my current implementation; which also results in LLCurlRequests to never expire. This way things won't break unexpectedly when this ever changes. Since on this branch isValid was only called still (the rest was removed already) to check if the curl download expired, I took the liberty to rename isValid to hasNotExpired.
1209 lines
28 KiB
C++
1209 lines
28 KiB
C++
/**
|
|
* @file llpumpio.cpp
|
|
* @author Phoenix
|
|
* @date 2004-11-21
|
|
* @brief Implementation of the i/o pump and related functions.
|
|
*
|
|
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
|
|
* Second Life Viewer Source Code
|
|
* Copyright (C) 2010, Linden Research, Inc.
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation;
|
|
* version 2.1 of the License only.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with this library; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*
|
|
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
|
|
* $/LicenseInfo$
|
|
*/
|
|
|
|
#include "linden_common.h"
|
|
#include "llpumpio.h"
|
|
|
|
#include <map>
|
|
#include <set>
|
|
#include "apr_poll.h"
|
|
|
|
#include "llapr.h"
|
|
#include "llfasttimer.h"
|
|
#include "llmemtype.h"
|
|
#include "llstl.h"
|
|
#include "llstat.h"
|
|
#include "llthread.h"
|
|
|
|
// These should not be enabled in production, but they can be
|
|
// intensely useful during development for finding certain kinds of
|
|
// bugs.
|
|
#if LL_LINUX
|
|
//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
|
|
//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
|
|
#if LL_DEBUG_POLL_FILE_DESCRIPTORS
|
|
#include "apr_portable.h"
|
|
#endif
|
|
#endif
|
|
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
#include <typeinfo>
|
|
#endif
|
|
|
|
// constants for poll timeout. if we are threading, we want to have a
|
|
// longer poll timeout.
|
|
#if LL_THREADS_APR
|
|
static const S32 DEFAULT_POLL_TIMEOUT = 1000;
|
|
#else
|
|
static const S32 DEFAULT_POLL_TIMEOUT = 0;
|
|
#endif
|
|
|
|
// The default (and fallback) expiration time for chains
|
|
const F32 DEFAULT_CHAIN_EXPIRY_SECS = 30.0f;
|
|
extern const F32 SHORT_CHAIN_EXPIRY_SECS = 1.0f;
|
|
extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
|
|
|
|
// sorta spammy debug modes.
|
|
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR 1
|
|
//#define LL_DEBUG_PROCESS_LINK 1
|
|
//#define LL_DEBUG_PROCESS_RETURN_VALUE 1
|
|
|
|
// Super spammy debug mode.
|
|
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
|
|
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
|
|
|
|
//
|
|
// local functions
|
|
//
|
|
void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
|
|
{
|
|
#if LL_DEBUG_POLL_FILE_DESCRIPTORS
|
|
if(!poll)
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
|
|
return;
|
|
}
|
|
if(poll->desc.s)
|
|
{
|
|
apr_os_sock_t os_sock;
|
|
if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
|
|
<< " at " << poll->desc.s << llendl;
|
|
}
|
|
else
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
|
|
<< " at " << poll->desc.s << llendl;
|
|
}
|
|
}
|
|
else if(poll->desc.f)
|
|
{
|
|
apr_os_file_t os_file;
|
|
if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
|
|
<< " at " << poll->desc.f << llendl;
|
|
}
|
|
else
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
|
|
<< " at " << poll->desc.f << llendl;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/**
|
|
* @class
|
|
*/
|
|
class LLChainSleeper : public LLRunnable
|
|
{
|
|
public:
|
|
static LLRunner::run_ptr_t build(LLPumpIO* pump, S32 key)
|
|
{
|
|
return LLRunner::run_ptr_t(new LLChainSleeper(pump, key));
|
|
}
|
|
|
|
virtual void run(LLRunner* runner, S64 handle)
|
|
{
|
|
mPump->clearLock(mKey);
|
|
}
|
|
|
|
protected:
|
|
LLChainSleeper(LLPumpIO* pump, S32 key) : mPump(pump), mKey(key) {}
|
|
LLPumpIO* mPump;
|
|
S32 mKey;
|
|
};
|
|
|
|
|
|
/**
|
|
* @struct ll_delete_apr_pollset_fd_client_data
|
|
* @brief This is a simple helper class to clean up our client data.
|
|
*/
|
|
struct ll_delete_apr_pollset_fd_client_data
|
|
{
|
|
typedef std::pair<LLIOPipe::ptr_t, apr_pollfd_t> pipe_conditional_t;
|
|
void operator()(const pipe_conditional_t& conditional)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
S32* client_id = (S32*)conditional.second.client_data;
|
|
delete client_id;
|
|
}
|
|
};
|
|
|
|
/**
|
|
* LLPumpIO
|
|
*/
|
|
LLPumpIO::LLPumpIO(void) :
|
|
mState(LLPumpIO::NORMAL),
|
|
mRebuildPollset(false),
|
|
mPollset(NULL),
|
|
mPollsetClientID(0),
|
|
mNextLock(0),
|
|
mCurrentPoolReallocCount(0),
|
|
mChainsMutex(NULL),
|
|
mCallbackMutex(NULL),
|
|
mCurrentChain(mRunningChains.end())
|
|
{
|
|
mCurrentChain = mRunningChains.end();
|
|
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
initialize();
|
|
}
|
|
|
|
LLPumpIO::~LLPumpIO()
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
#if LL_THREADS_APR
|
|
if (mChainsMutex) apr_thread_mutex_destroy(mChainsMutex);
|
|
if (mCallbackMutex) apr_thread_mutex_destroy(mCallbackMutex);
|
|
#endif
|
|
mChainsMutex = NULL;
|
|
mCallbackMutex = NULL;
|
|
if(mPollset)
|
|
{
|
|
// lldebugs << "cleaning up pollset" << llendl;
|
|
apr_pollset_destroy(mPollset);
|
|
mPollset = NULL;
|
|
}
|
|
}
|
|
|
|
bool LLPumpIO::addChain(chain_t const& chain, F32 timeout)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
|
|
chain_t::const_iterator it = chain.begin();
|
|
chain_t::const_iterator const end = chain.end();
|
|
if (it == end) return false;
|
|
|
|
LLChainInfo info;
|
|
for(; it != end; ++it)
|
|
{
|
|
if ((*it)->hasExpiration())
|
|
{
|
|
info.mHasExpiration = true;
|
|
break;
|
|
}
|
|
}
|
|
info.setTimeoutSeconds(timeout);
|
|
info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
|
|
LLLinkInfo link;
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
|
|
<< typeid(*(chain[0])).name() << "'" << llendl;
|
|
#else
|
|
lldebugs << "LLPumpIO::addChain() " << chain[0] <<llendl;
|
|
#endif
|
|
it = chain.begin();
|
|
for(; it != end; ++it)
|
|
{
|
|
link.mPipe = (*it);
|
|
link.mChannels = info.mData->nextChannel();
|
|
info.mChainLinks.push_back(link);
|
|
}
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mChainsMutex);
|
|
#endif
|
|
mPendingChains.push_back(info);
|
|
return true;
|
|
}
|
|
|
|
bool LLPumpIO::addChain(
|
|
const LLPumpIO::links_t& links,
|
|
LLIOPipe::buffer_ptr_t data,
|
|
LLSD context,
|
|
F32 timeout)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
|
|
// remember that if the caller is providing a full link
|
|
// description, we need to have that description matched to a
|
|
// particular buffer.
|
|
if(!data) return false;
|
|
if(links.empty()) return false;
|
|
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mChainsMutex);
|
|
#endif
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << " '"
|
|
<< typeid(*(links[0].mPipe)).name() << "'" << llendl;
|
|
#else
|
|
lldebugs << "LLPumpIO::addChain() " << links[0].mPipe << llendl;
|
|
#endif
|
|
LLChainInfo info;
|
|
info.setTimeoutSeconds(timeout);
|
|
info.mChainLinks = links;
|
|
info.mData = data;
|
|
info.mContext = context;
|
|
mPendingChains.push_back(info);
|
|
return true;
|
|
}
|
|
|
|
bool LLPumpIO::setTimeoutSeconds(F32 timeout)
|
|
{
|
|
// If no chain is running, return failure.
|
|
if(mRunningChains.end() == mCurrentChain)
|
|
{
|
|
return false;
|
|
}
|
|
(*mCurrentChain).setTimeoutSeconds(timeout);
|
|
return true;
|
|
}
|
|
|
|
void LLPumpIO::adjustTimeoutSeconds(F32 delta)
|
|
{
|
|
// Ensure a chain is running
|
|
if(mRunningChains.end() != mCurrentChain)
|
|
{
|
|
(*mCurrentChain).adjustTimeoutSeconds(delta);
|
|
}
|
|
}
|
|
|
|
static std::string events_2_string(apr_int16_t events)
|
|
{
|
|
std::ostringstream ostr;
|
|
if(events & APR_POLLIN)
|
|
{
|
|
ostr << "read,";
|
|
}
|
|
if(events & APR_POLLPRI)
|
|
{
|
|
ostr << "priority,";
|
|
}
|
|
if(events & APR_POLLOUT)
|
|
{
|
|
ostr << "write,";
|
|
}
|
|
if(events & APR_POLLERR)
|
|
{
|
|
ostr << "error,";
|
|
}
|
|
if(events & APR_POLLHUP)
|
|
{
|
|
ostr << "hangup,";
|
|
}
|
|
if(events & APR_POLLNVAL)
|
|
{
|
|
ostr << "invalid,";
|
|
}
|
|
return chop_tail_copy(ostr.str(), 1);
|
|
}
|
|
|
|
bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
if(!pipe) return false;
|
|
ll_debug_poll_fd("Set conditional", poll);
|
|
|
|
lldebugs << "Setting conditionals (" << (poll ? events_2_string(poll->reqevents) :"null")
|
|
<< ") "
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
<< "on pipe " << typeid(*pipe).name()
|
|
#endif
|
|
<< " at " << pipe << llendl;
|
|
|
|
// remove any matching poll file descriptors for this pipe.
|
|
LLIOPipe::ptr_t pipe_ptr(pipe);
|
|
LLChainInfo::conditionals_t::iterator it;
|
|
it = (*mCurrentChain).mDescriptors.begin();
|
|
while(it != (*mCurrentChain).mDescriptors.end())
|
|
{
|
|
LLChainInfo::pipe_conditional_t& value = (*it);
|
|
if(pipe_ptr == value.first)
|
|
{
|
|
ll_delete_apr_pollset_fd_client_data()(value);
|
|
it = (*mCurrentChain).mDescriptors.erase(it);
|
|
mRebuildPollset = true;
|
|
}
|
|
else
|
|
{
|
|
++it;
|
|
}
|
|
}
|
|
|
|
if(!poll)
|
|
{
|
|
mRebuildPollset = true;
|
|
return true;
|
|
}
|
|
LLChainInfo::pipe_conditional_t value;
|
|
value.first = pipe_ptr;
|
|
value.second = *poll;
|
|
value.second.rtnevents = 0;
|
|
if(!poll->p)
|
|
{
|
|
// each fd needs a pool to work with, so if one was
|
|
// not specified, use this pool.
|
|
value.second.p = (*mCurrentChain).mDescriptorsPool->operator()();
|
|
}
|
|
value.second.client_data = new S32(++mPollsetClientID);
|
|
(*mCurrentChain).mDescriptors.push_back(value);
|
|
mRebuildPollset = true;
|
|
return true;
|
|
}
|
|
|
|
S32 LLPumpIO::setLock()
|
|
{
|
|
// *NOTE: I do not think it is necessary to acquire a mutex here
|
|
// since this should only be called during the pump(), and should
|
|
// only change the running chain. Any other use of this method is
|
|
// incorrect usage. If it becomes necessary to acquire a lock
|
|
// here, be sure to lock here and call a protected method to get
|
|
// the lock, and sleepChain() should probably acquire the same
|
|
// lock while and calling the same protected implementation to
|
|
// lock the runner at the same time.
|
|
|
|
// If no chain is running, return failure.
|
|
if(mRunningChains.end() == mCurrentChain)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
// deal with wrap.
|
|
if(++mNextLock <= 0)
|
|
{
|
|
mNextLock = 1;
|
|
}
|
|
|
|
// set the lock
|
|
(*mCurrentChain).mLock = mNextLock;
|
|
return mNextLock;
|
|
}
|
|
|
|
void LLPumpIO::clearLock(S32 key)
|
|
{
|
|
// We need to lock it here since we do not want to be iterating
|
|
// over the chains twice. We can safely call process() while this
|
|
// is happening since we should not be erasing a locked pipe, and
|
|
// therefore won't be treading into deleted memory. I think we can
|
|
// also clear the lock on the chain safely since the pump only
|
|
// reads that value.
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mChainsMutex);
|
|
#endif
|
|
mClearLocks.insert(key);
|
|
}
|
|
|
|
bool LLPumpIO::sleepChain(F64 seconds)
|
|
{
|
|
// Much like the call to setLock(), this should only be called
|
|
// from one chain during processing, so there is no need to
|
|
// acquire a mutex.
|
|
if(seconds <= 0.0) return false;
|
|
S32 key = setLock();
|
|
if(!key) return false;
|
|
LLRunner::run_handle_t handle = mRunner.addRunnable(
|
|
LLChainSleeper::build(this, key),
|
|
LLRunner::RUN_IN,
|
|
seconds);
|
|
if(0 == handle) return false;
|
|
return true;
|
|
}
|
|
|
|
bool LLPumpIO::copyCurrentLinkInfo(links_t& links) const
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
if(mRunningChains.end() == mCurrentChain)
|
|
{
|
|
return false;
|
|
}
|
|
std::copy(
|
|
(*mCurrentChain).mChainLinks.begin(),
|
|
(*mCurrentChain).mChainLinks.end(),
|
|
std::back_insert_iterator<links_t>(links));
|
|
return true;
|
|
}
|
|
|
|
void LLPumpIO::pump()
|
|
{
|
|
pump(DEFAULT_POLL_TIMEOUT);
|
|
}
|
|
|
|
static LLFastTimer::DeclareTimer FTM_PUMP_IO("Pump IO");
|
|
|
|
LLPumpIO::current_chain_t LLPumpIO::removeRunningChain(LLPumpIO::current_chain_t& run_chain)
|
|
{
|
|
std::for_each(
|
|
(*run_chain).mDescriptors.begin(),
|
|
(*run_chain).mDescriptors.end(),
|
|
ll_delete_apr_pollset_fd_client_data());
|
|
return mRunningChains.erase(run_chain);
|
|
}
|
|
|
|
//timeout is in microseconds
|
|
void LLPumpIO::pump(const S32& poll_timeout)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
LLFastTimer t1(FTM_PUMP_IO);
|
|
//llinfos << "LLPumpIO::pump()" << llendl;
|
|
|
|
// Run any pending runners.
|
|
mRunner.run();
|
|
|
|
// We need to move all of the pending heads over to the running
|
|
// chains.
|
|
PUMP_DEBUG;
|
|
if(true)
|
|
{
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mChainsMutex);
|
|
#endif
|
|
// bail if this pump is paused.
|
|
if(PAUSING == mState)
|
|
{
|
|
mState = PAUSED;
|
|
}
|
|
if(PAUSED == mState)
|
|
{
|
|
return;
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
// Move the pending chains over to the running chaings
|
|
if(!mPendingChains.empty())
|
|
{
|
|
PUMP_DEBUG;
|
|
//lldebugs << "Pushing " << mPendingChains.size() << "." << llendl;
|
|
std::copy(
|
|
mPendingChains.begin(),
|
|
mPendingChains.end(),
|
|
std::back_insert_iterator<running_chains_t>(mRunningChains));
|
|
mPendingChains.clear();
|
|
PUMP_DEBUG;
|
|
}
|
|
|
|
// Clear any locks. This needs to be done here so that we do
|
|
// not clash during a call to clearLock().
|
|
if(!mClearLocks.empty())
|
|
{
|
|
PUMP_DEBUG;
|
|
running_chains_t::iterator it = mRunningChains.begin();
|
|
running_chains_t::iterator end = mRunningChains.end();
|
|
std::set<S32>::iterator not_cleared = mClearLocks.end();
|
|
for(; it != end; ++it)
|
|
{
|
|
if((*it).mLock && mClearLocks.find((*it).mLock) != not_cleared)
|
|
{
|
|
(*it).mLock = 0;
|
|
}
|
|
}
|
|
PUMP_DEBUG;
|
|
mClearLocks.clear();
|
|
}
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
// rebuild the pollset if necessary
|
|
if(mRebuildPollset)
|
|
{
|
|
PUMP_DEBUG;
|
|
rebuildPollset();
|
|
mRebuildPollset = false;
|
|
}
|
|
|
|
// Poll based on the last known pollset
|
|
// *TODO: may want to pass in a poll timeout so it works correctly
|
|
// in single and multi threaded processes.
|
|
PUMP_DEBUG;
|
|
typedef std::map<S32, S32> signal_client_t;
|
|
signal_client_t signalled_client;
|
|
const apr_pollfd_t* poll_fd = NULL;
|
|
if(mPollset)
|
|
{
|
|
PUMP_DEBUG;
|
|
//llinfos << "polling" << llendl;
|
|
S32 count = 0;
|
|
S32 client_id = 0;
|
|
{
|
|
LLPerfBlock polltime("pump_poll");
|
|
apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
|
|
}
|
|
PUMP_DEBUG;
|
|
for(S32 ii = 0; ii < count; ++ii)
|
|
{
|
|
ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
|
|
client_id = *((S32*)poll_fd[ii].client_data);
|
|
signalled_client[client_id] = ii;
|
|
}
|
|
PUMP_DEBUG;
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
// set up for a check to see if each one was signalled
|
|
signal_client_t::iterator not_signalled = signalled_client.end();
|
|
|
|
// Process everything as appropriate
|
|
//lldebugs << "Running chain count: " << mRunningChains.size() << llendl;
|
|
running_chains_t::iterator run_chain = mRunningChains.begin();
|
|
bool process_this_chain = false;
|
|
while( run_chain != mRunningChains.end() )
|
|
{
|
|
PUMP_DEBUG;
|
|
if((*run_chain).mInit
|
|
&& (*run_chain).mTimer.getStarted()
|
|
&& (*run_chain).mTimer.hasExpired())
|
|
{
|
|
PUMP_DEBUG;
|
|
if(handleChainError(*run_chain, LLIOPipe::STATUS_EXPIRED))
|
|
{
|
|
// the pipe probably handled the error. If the handler
|
|
// forgot to reset the expiration then we need to do
|
|
// that here.
|
|
if((*run_chain).mTimer.getStarted()
|
|
&& (*run_chain).mTimer.hasExpired())
|
|
{
|
|
PUMP_DEBUG;
|
|
llinfos << "Error handler forgot to reset timeout. "
|
|
<< "Resetting to " << DEFAULT_CHAIN_EXPIRY_SECS
|
|
<< " seconds." << llendl;
|
|
(*run_chain).setTimeoutSeconds(DEFAULT_CHAIN_EXPIRY_SECS);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
PUMP_DEBUG;
|
|
// it timed out and no one handled it, so we need to
|
|
// retire the chain
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "Removing chain "
|
|
<< (*run_chain).mChainLinks[0].mPipe
|
|
<< " '"
|
|
<< typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
|
|
<< "' because it timed out." << llendl;
|
|
#else
|
|
// lldebugs << "Removing chain "
|
|
// << (*run_chain).mChainLinks[0].mPipe
|
|
// << " because we reached the end." << llendl;
|
|
#endif
|
|
run_chain = removeRunningChain(run_chain);
|
|
continue;
|
|
}
|
|
}
|
|
else if(isChainExpired(*run_chain))
|
|
{
|
|
run_chain = removeRunningChain(run_chain);
|
|
continue;
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
if((*run_chain).mLock)
|
|
{
|
|
++run_chain;
|
|
continue;
|
|
}
|
|
PUMP_DEBUG;
|
|
mCurrentChain = run_chain;
|
|
|
|
if((*run_chain).mDescriptors.empty())
|
|
{
|
|
// if there are no conditionals, just process this chain.
|
|
process_this_chain = true;
|
|
//lldebugs << "no conditionals - processing" << llendl;
|
|
}
|
|
else
|
|
{
|
|
PUMP_DEBUG;
|
|
//lldebugs << "checking conditionals" << llendl;
|
|
// Check if this run chain was signalled. If any file
|
|
// descriptor is ready for something, then go ahead and
|
|
// process this chian.
|
|
process_this_chain = false;
|
|
if(!signalled_client.empty())
|
|
{
|
|
PUMP_DEBUG;
|
|
LLChainInfo::conditionals_t::iterator it;
|
|
it = (*run_chain).mDescriptors.begin();
|
|
LLChainInfo::conditionals_t::iterator end;
|
|
end = (*run_chain).mDescriptors.end();
|
|
S32 client_id = 0;
|
|
signal_client_t::iterator signal;
|
|
for(; it != end; ++it)
|
|
{
|
|
PUMP_DEBUG;
|
|
client_id = *((S32*)((*it).second.client_data));
|
|
signal = signalled_client.find(client_id);
|
|
if (signal == not_signalled) continue;
|
|
static const apr_int16_t POLL_CHAIN_ERROR =
|
|
APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
|
|
const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
|
|
if(poll->rtnevents & POLL_CHAIN_ERROR)
|
|
{
|
|
// Potential eror condition has been
|
|
// returned. If HUP was one of them, we pass
|
|
// that as the error even though there may be
|
|
// more. If there are in fact more errors,
|
|
// we'll just wait for that detection until
|
|
// the next pump() cycle to catch it so that
|
|
// the logic here gets no more strained than
|
|
// it already is.
|
|
LLIOPipe::EStatus error_status;
|
|
if(poll->rtnevents & APR_POLLHUP)
|
|
error_status = LLIOPipe::STATUS_LOST_CONNECTION;
|
|
else
|
|
error_status = LLIOPipe::STATUS_ERROR;
|
|
if(handleChainError(*run_chain, error_status)) break;
|
|
ll_debug_poll_fd("Removing pipe", poll);
|
|
llwarns << "Removing pipe "
|
|
<< (*run_chain).mChainLinks[0].mPipe
|
|
<< " '"
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
<< typeid(
|
|
*((*run_chain).mChainLinks[0].mPipe)).name()
|
|
#endif
|
|
<< "' because: "
|
|
<< events_2_string(poll->rtnevents)
|
|
<< llendl;
|
|
(*run_chain).mHead = (*run_chain).mChainLinks.end();
|
|
break;
|
|
}
|
|
|
|
// at least 1 fd got signalled, and there were no
|
|
// errors. That means we process this chain.
|
|
process_this_chain = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if(process_this_chain)
|
|
{
|
|
PUMP_DEBUG;
|
|
if(!((*run_chain).mInit))
|
|
{
|
|
(*run_chain).mHead = (*run_chain).mChainLinks.begin();
|
|
(*run_chain).mInit = true;
|
|
}
|
|
PUMP_DEBUG;
|
|
processChain(*run_chain);
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
if((*run_chain).mHead == (*run_chain).mChainLinks.end())
|
|
{
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
|
|
<< " '"
|
|
<< typeid(*((*run_chain).mChainLinks[0].mPipe)).name()
|
|
<< "' because we reached the end." << llendl;
|
|
#else
|
|
// lldebugs << "Removing chain " << (*run_chain).mChainLinks[0].mPipe
|
|
// << " because we reached the end." << llendl;
|
|
#endif
|
|
|
|
PUMP_DEBUG;
|
|
// This chain is done. Clean up any allocated memory and
|
|
// erase the chain info.
|
|
run_chain = removeRunningChain(run_chain);
|
|
|
|
// *NOTE: may not always need to rebuild the pollset.
|
|
mRebuildPollset = true;
|
|
}
|
|
else
|
|
{
|
|
PUMP_DEBUG;
|
|
// this chain needs more processing - just go to the next
|
|
// chain.
|
|
++run_chain;
|
|
}
|
|
}
|
|
|
|
PUMP_DEBUG;
|
|
// null out the chain
|
|
mCurrentChain = mRunningChains.end();
|
|
END_PUMP_DEBUG;
|
|
}
|
|
|
|
//bool LLPumpIO::respond(const chain_t& pipes)
|
|
//{
|
|
//#if LL_THREADS_APR
|
|
// LLScopedLock lock(mCallbackMutex);
|
|
//#endif
|
|
// LLChainInfo info;
|
|
// links_t links;
|
|
//
|
|
// mPendingCallbacks.push_back(info);
|
|
// return true;
|
|
//}
|
|
|
|
bool LLPumpIO::respond(LLIOPipe* pipe)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
if(NULL == pipe) return false;
|
|
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mCallbackMutex);
|
|
#endif
|
|
LLChainInfo info;
|
|
LLLinkInfo link;
|
|
link.mPipe = pipe;
|
|
info.mChainLinks.push_back(link);
|
|
mPendingCallbacks.push_back(info);
|
|
return true;
|
|
}
|
|
|
|
bool LLPumpIO::respond(
|
|
const links_t& links,
|
|
LLIOPipe::buffer_ptr_t data,
|
|
LLSD context)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
// if the caller is providing a full link description, we need to
|
|
// have that description matched to a particular buffer.
|
|
if(!data) return false;
|
|
if(links.empty()) return false;
|
|
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mCallbackMutex);
|
|
#endif
|
|
|
|
// Add the callback response
|
|
LLChainInfo info;
|
|
info.mChainLinks = links;
|
|
info.mData = data;
|
|
info.mContext = context;
|
|
mPendingCallbacks.push_back(info);
|
|
return true;
|
|
}
|
|
|
|
static LLFastTimer::DeclareTimer FTM_PUMP_CALLBACK_CHAIN("Chain");
|
|
|
|
void LLPumpIO::callback()
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
//llinfos << "LLPumpIO::callback()" << llendl;
|
|
if(true)
|
|
{
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mCallbackMutex);
|
|
#endif
|
|
std::copy(
|
|
mPendingCallbacks.begin(),
|
|
mPendingCallbacks.end(),
|
|
std::back_insert_iterator<callbacks_t>(mCallbacks));
|
|
mPendingCallbacks.clear();
|
|
}
|
|
if(!mCallbacks.empty())
|
|
{
|
|
callbacks_t::iterator it = mCallbacks.begin();
|
|
callbacks_t::iterator end = mCallbacks.end();
|
|
for(; it != end; ++it)
|
|
{
|
|
LLFastTimer t(FTM_PUMP_CALLBACK_CHAIN);
|
|
// it's always the first and last time for respone chains
|
|
(*it).mHead = (*it).mChainLinks.begin();
|
|
(*it).mInit = true;
|
|
(*it).mEOS = true;
|
|
processChain(*it);
|
|
}
|
|
mCallbacks.clear();
|
|
}
|
|
}
|
|
|
|
void LLPumpIO::control(LLPumpIO::EControl op)
|
|
{
|
|
#if LL_THREADS_APR
|
|
LLScopedLock lock(mChainsMutex);
|
|
#endif
|
|
switch(op)
|
|
{
|
|
case PAUSE:
|
|
mState = PAUSING;
|
|
break;
|
|
case RESUME:
|
|
mState = NORMAL;
|
|
break;
|
|
default:
|
|
// no-op
|
|
break;
|
|
}
|
|
}
|
|
|
|
void LLPumpIO::initialize(void)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
mPool.create();
|
|
#if LL_THREADS_APR
|
|
// SJB: Windows defaults to NESTED and OSX defaults to UNNESTED, so use UNNESTED explicitly.
|
|
apr_thread_mutex_create(&mChainsMutex, APR_THREAD_MUTEX_UNNESTED, mPool());
|
|
apr_thread_mutex_create(&mCallbackMutex, APR_THREAD_MUTEX_UNNESTED, mPool());
|
|
#endif
|
|
}
|
|
|
|
void LLPumpIO::rebuildPollset()
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
// lldebugs << "LLPumpIO::rebuildPollset()" << llendl;
|
|
if(mPollset)
|
|
{
|
|
//lldebugs << "destroying pollset" << llendl;
|
|
apr_pollset_destroy(mPollset);
|
|
mPollset = NULL;
|
|
}
|
|
U32 size = 0;
|
|
running_chains_t::iterator run_it = mRunningChains.begin();
|
|
running_chains_t::iterator run_end = mRunningChains.end();
|
|
for(; run_it != run_end; ++run_it)
|
|
{
|
|
size += (*run_it).mDescriptors.size();
|
|
}
|
|
//lldebugs << "found " << size << " descriptors." << llendl;
|
|
if(size)
|
|
{
|
|
// Recycle the memory pool
|
|
const S32 POLLSET_POOL_RECYCLE_COUNT = 100;
|
|
if(mCurrentPool
|
|
&& (0 == (++mCurrentPoolReallocCount % POLLSET_POOL_RECYCLE_COUNT)))
|
|
{
|
|
mCurrentPool.destroy();
|
|
mCurrentPoolReallocCount = 0;
|
|
}
|
|
if(!mCurrentPool)
|
|
{
|
|
mCurrentPool.create(mPool);
|
|
}
|
|
|
|
// add all of the file descriptors
|
|
run_it = mRunningChains.begin();
|
|
LLChainInfo::conditionals_t::iterator fd_it;
|
|
LLChainInfo::conditionals_t::iterator fd_end;
|
|
apr_pollset_create(&mPollset, size, mCurrentPool(), 0);
|
|
for(; run_it != run_end; ++run_it)
|
|
{
|
|
fd_it = (*run_it).mDescriptors.begin();
|
|
fd_end = (*run_it).mDescriptors.end();
|
|
for(; fd_it != fd_end; ++fd_it)
|
|
{
|
|
apr_pollset_add(mPollset, &((*fd_it).second));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void LLPumpIO::processChain(LLChainInfo& chain)
|
|
{
|
|
PUMP_DEBUG;
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
LLIOPipe::EStatus status = LLIOPipe::STATUS_OK;
|
|
links_t::iterator it = chain.mHead;
|
|
links_t::iterator end = chain.mChainLinks.end();
|
|
bool need_process_signaled = false;
|
|
bool keep_going = true;
|
|
do
|
|
{
|
|
#if LL_DEBUG_PROCESS_LINK
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
llinfos << "Processing " << typeid(*((*it).mPipe)).name() << "."
|
|
<< llendl;
|
|
#else
|
|
llinfos << "Processing link " << (*it).mPipe << "." << llendl;
|
|
#endif
|
|
#endif
|
|
#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN
|
|
if(chain.mData)
|
|
{
|
|
char* buf = NULL;
|
|
S32 bytes = chain.mData->countAfter((*it).mChannels.in(), NULL);
|
|
if(bytes)
|
|
{
|
|
buf = new char[bytes + 1];
|
|
chain.mData->readAfter(
|
|
(*it).mChannels.in(),
|
|
NULL,
|
|
(U8*)buf,
|
|
bytes);
|
|
buf[bytes] = '\0';
|
|
llinfos << "CHANNEL IN(" << (*it).mChannels.in() << "): "
|
|
<< buf << llendl;
|
|
delete[] buf;
|
|
buf = NULL;
|
|
}
|
|
else
|
|
{
|
|
llinfos << "CHANNEL IN(" << (*it).mChannels.in()<< "): (null)"
|
|
<< llendl;
|
|
}
|
|
}
|
|
#endif
|
|
PUMP_DEBUG;
|
|
status = (*it).mPipe->process(
|
|
(*it).mChannels,
|
|
chain.mData,
|
|
chain.mEOS,
|
|
chain.mContext,
|
|
this);
|
|
#if LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT
|
|
if(chain.mData)
|
|
{
|
|
char* buf = NULL;
|
|
S32 bytes = chain.mData->countAfter((*it).mChannels.out(), NULL);
|
|
if(bytes)
|
|
{
|
|
buf = new char[bytes + 1];
|
|
chain.mData->readAfter(
|
|
(*it).mChannels.out(),
|
|
NULL,
|
|
(U8*)buf,
|
|
bytes);
|
|
buf[bytes] = '\0';
|
|
llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): "
|
|
<< buf << llendl;
|
|
delete[] buf;
|
|
buf = NULL;
|
|
}
|
|
else
|
|
{
|
|
llinfos << "CHANNEL OUT(" << (*it).mChannels.out()<< "): (null)"
|
|
<< llendl;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
#if LL_DEBUG_PROCESS_RETURN_VALUE
|
|
// Only bother with the success codes - error codes are logged
|
|
// below.
|
|
if(LLIOPipe::isSuccess(status))
|
|
{
|
|
llinfos << "Pipe returned: '"
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
<< typeid(*((*it).mPipe)).name() << "':'"
|
|
#endif
|
|
<< LLIOPipe::lookupStatusString(status) << "'" << llendl;
|
|
}
|
|
#endif
|
|
|
|
PUMP_DEBUG;
|
|
switch(status)
|
|
{
|
|
case LLIOPipe::STATUS_OK:
|
|
// no-op
|
|
break;
|
|
case LLIOPipe::STATUS_STOP:
|
|
PUMP_DEBUG;
|
|
status = LLIOPipe::STATUS_OK;
|
|
chain.mHead = end;
|
|
keep_going = false;
|
|
break;
|
|
case LLIOPipe::STATUS_DONE:
|
|
PUMP_DEBUG;
|
|
status = LLIOPipe::STATUS_OK;
|
|
chain.mHead = (it + 1);
|
|
chain.mEOS = true;
|
|
break;
|
|
case LLIOPipe::STATUS_BREAK:
|
|
PUMP_DEBUG;
|
|
status = LLIOPipe::STATUS_OK;
|
|
keep_going = false;
|
|
break;
|
|
case LLIOPipe::STATUS_NEED_PROCESS:
|
|
PUMP_DEBUG;
|
|
status = LLIOPipe::STATUS_OK;
|
|
if(!need_process_signaled)
|
|
{
|
|
need_process_signaled = true;
|
|
chain.mHead = it;
|
|
}
|
|
break;
|
|
default:
|
|
PUMP_DEBUG;
|
|
if(LLIOPipe::isError(status))
|
|
{
|
|
llinfos << "Pump generated pipe err: '"
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
<< typeid(*((*it).mPipe)).name() << "':'"
|
|
#endif
|
|
<< LLIOPipe::lookupStatusString(status)
|
|
<< "'" << llendl;
|
|
#if LL_DEBUG_SPEW_BUFFER_CHANNEL_IN_ON_ERROR
|
|
if(chain.mData)
|
|
{
|
|
char* buf = NULL;
|
|
S32 bytes = chain.mData->countAfter(
|
|
(*it).mChannels.in(),
|
|
NULL);
|
|
if(bytes)
|
|
{
|
|
buf = new char[bytes + 1];
|
|
chain.mData->readAfter(
|
|
(*it).mChannels.in(),
|
|
NULL,
|
|
(U8*)buf,
|
|
bytes);
|
|
buf[bytes] = '\0';
|
|
llinfos << "Input After Error: " << buf << llendl;
|
|
delete[] buf;
|
|
buf = NULL;
|
|
}
|
|
else
|
|
{
|
|
llinfos << "Input After Error: (null)" << llendl;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
llinfos << "Input After Error: (null)" << llendl;
|
|
}
|
|
#endif
|
|
keep_going = false;
|
|
chain.mHead = it;
|
|
if(!handleChainError(chain, status))
|
|
{
|
|
chain.mHead = end;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
llinfos << "Unhandled status code: " << status << ":"
|
|
<< LLIOPipe::lookupStatusString(status) << llendl;
|
|
}
|
|
break;
|
|
}
|
|
PUMP_DEBUG;
|
|
} while(keep_going && (++it != end));
|
|
PUMP_DEBUG;
|
|
}
|
|
|
|
bool LLPumpIO::isChainExpired(LLChainInfo& chain)
|
|
{
|
|
if(!chain.mHasExpiration)
|
|
{
|
|
return false ;
|
|
}
|
|
|
|
for(links_t::iterator iter = chain.mChainLinks.begin(); iter != chain.mChainLinks.end(); ++iter)
|
|
{
|
|
if(!(*iter).mPipe->hasNotExpired())
|
|
{
|
|
return true ;
|
|
}
|
|
}
|
|
|
|
return false ;
|
|
}
|
|
|
|
bool LLPumpIO::handleChainError(
|
|
LLChainInfo& chain,
|
|
LLIOPipe::EStatus error)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
links_t::reverse_iterator rit;
|
|
if(chain.mHead == chain.mChainLinks.end())
|
|
{
|
|
rit = links_t::reverse_iterator(chain.mHead);
|
|
}
|
|
else
|
|
{
|
|
rit = links_t::reverse_iterator(chain.mHead + 1);
|
|
}
|
|
|
|
links_t::reverse_iterator rend = chain.mChainLinks.rend();
|
|
bool handled = false;
|
|
bool keep_going = true;
|
|
do
|
|
{
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "Passing error to " << typeid(*((*rit).mPipe)).name()
|
|
<< "." << llendl;
|
|
#endif
|
|
error = (*rit).mPipe->handleError(error, this);
|
|
switch(error)
|
|
{
|
|
case LLIOPipe::STATUS_OK:
|
|
handled = true;
|
|
chain.mHead = rit.base();
|
|
break;
|
|
case LLIOPipe::STATUS_STOP:
|
|
case LLIOPipe::STATUS_DONE:
|
|
case LLIOPipe::STATUS_BREAK:
|
|
case LLIOPipe::STATUS_NEED_PROCESS:
|
|
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
|
lldebugs << "Pipe " << typeid(*((*rit).mPipe)).name()
|
|
<< " returned code to stop error handler." << llendl;
|
|
#endif
|
|
keep_going = false;
|
|
break;
|
|
case LLIOPipe::STATUS_EXPIRED:
|
|
keep_going = false;
|
|
break ;
|
|
default:
|
|
if(LLIOPipe::isSuccess(error))
|
|
{
|
|
llinfos << "Unhandled status code: " << error << ":"
|
|
<< LLIOPipe::lookupStatusString(error) << llendl;
|
|
error = LLIOPipe::STATUS_ERROR;
|
|
keep_going = false;
|
|
}
|
|
break;
|
|
}
|
|
} while(keep_going && !handled && (++rit != rend));
|
|
return handled;
|
|
}
|
|
|
|
/**
|
|
* LLPumpIO::LLChainInfo
|
|
*/
|
|
|
|
LLPumpIO::LLChainInfo::LLChainInfo() :
|
|
mInit(false),
|
|
mLock(0),
|
|
mEOS(false),
|
|
mHasExpiration(false),
|
|
mDescriptorsPool(new LLAPRPool(LLThread::tldata().mRootPool))
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
mTimer.setTimerExpirySec(DEFAULT_CHAIN_EXPIRY_SECS);
|
|
}
|
|
|
|
void LLPumpIO::LLChainInfo::setTimeoutSeconds(F32 timeout)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
if(timeout > 0.0f)
|
|
{
|
|
mTimer.start(timeout);
|
|
}
|
|
else
|
|
{
|
|
mTimer.stop();
|
|
}
|
|
}
|
|
|
|
void LLPumpIO::LLChainInfo::adjustTimeoutSeconds(F32 delta)
|
|
{
|
|
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
|
|
if(mTimer.getStarted())
|
|
{
|
|
F64 expiry = mTimer.expiresAt();
|
|
expiry += delta;
|
|
mTimer.setExpiryAt(expiry);
|
|
}
|
|
}
|