Threading voodoo: allow multiple concurrent calls to set_state().

This patch prepares AIStateMachine for the use of AITimer together
with calls to set_state() from other threads. The extra problem
in this case is that the main-thread CAN start running the state
machine again (when the timer times out), while before it was
assumed to be idle until a thread called set_state.

This also takes into account that a thread might call set_state()
and then AGAIN call set_state() before the main thread gets the
chance to call idle() inbetween.
This commit is contained in:
Aleric Inglewood
2012-07-09 04:19:28 +02:00
parent 744563a150
commit f012f664d2
5 changed files with 160 additions and 40 deletions

View File

@@ -310,11 +310,13 @@ void LLThread::wakeLocked()
}
}
#ifdef SHOW_ASSERT
// This allows the use of llassert(is_main_thread()) to assure the current thread is the main thread.
static apr_os_thread_t main_thread_id;
LL_COMMON_API bool is_main_thread(void) { return apr_os_thread_equal(main_thread_id, apr_os_thread_current()); }
#endif
//static
apr_os_thread_t LLThread::sMainThreadID;
void LLThread::set_main_thread_id(void)
{
sMainThreadID = apr_os_thread_current();
}
// The thread private handle to access the LLThreadLocalData instance.
apr_threadkey_t* LLThreadLocalData::sThreadLocalDataKey;
@@ -346,10 +348,8 @@ void LLThreadLocalData::init(void)
// Create the thread-local data for the main thread (this function is called by the main thread).
LLThreadLocalData::create(NULL);
#ifdef SHOW_ASSERT
// This function is called by the main thread.
main_thread_id = apr_os_thread_current();
#endif
LLThread::set_main_thread_id();
}
// This is called once for every thread when the thread is destructed.

View File

@@ -40,13 +40,6 @@
#include "llaprpool.h"
#include "llatomic.h"
#ifdef SHOW_ASSERT
extern LL_COMMON_API bool is_main_thread(void);
#define ASSERT_SINGLE_THREAD do { static apr_os_thread_t first_thread_id = apr_os_thread_current(); llassert(apr_os_thread_equal(first_thread_id, apr_os_thread_current())); } while(0)
#else
#define ASSERT_SINGLE_THREAD do { } while(0)
#endif
class LLThread;
class LLMutex;
class LLCondition;
@@ -89,6 +82,7 @@ private:
class LL_COMMON_API LLThread
{
private:
static apr_os_thread_t sMainThreadID;
static U32 sIDIter;
static LLAtomicS32 sCount;
static LLAtomicS32 sRunning;
@@ -112,6 +106,7 @@ public:
static S32 getCount() { return sCount; }
static S32 getRunning() { return sRunning; }
static void yield(); // Static because it can be called by the main thread, which doesn't have an LLThread data structure.
static bool is_main_thread(void) { return apr_os_thread_equal(LLThread::sMainThreadID, apr_os_thread_current()); }
public:
// PAUSE / RESUME functionality. See source code for important usage notes.
@@ -135,6 +130,9 @@ public:
// Return thread-local data for the current thread.
static LLThreadLocalData& tldata(void) { return LLThreadLocalData::tldata(); }
// Called once, from LLThreadLocalData::init().
static void set_main_thread_id(void);
U32 getID() const { return mID; }
private:
@@ -178,6 +176,13 @@ protected:
// mRunCondition->unlock();
};
#ifdef SHOW_ASSERT
LL_COMMON_API inline bool is_main_thread(void) { return LLThread::is_main_thread(); }
#define ASSERT_SINGLE_THREAD do { static apr_os_thread_t first_thread_id = apr_os_thread_current(); llassert(apr_os_thread_equal(first_thread_id, apr_os_thread_current())); } while(0)
#else
#define ASSERT_SINGLE_THREAD do { } while(0)
#endif
//============================================================================
#define MUTEX_DEBUG (LL_DEBUG || LL_RELEASE_WITH_DEBUG_INFO)

View File

@@ -34,7 +34,7 @@
enum curleasyrequeststatemachine_state_type {
AICurlEasyRequestStateMachine_addRequest = AIStateMachine::max_state,
AICurlEasyRequestStateMachine_waitAdded,
AICurlEasyRequestStateMachine_waitFinished,
AICurlEasyRequestStateMachine_waitRemoved,
AICurlEasyRequestStateMachine_finished
};
@@ -44,7 +44,7 @@ char const* AICurlEasyRequestStateMachine::state_str_impl(state_type run_state)
{
AI_CASE_RETURN(AICurlEasyRequestStateMachine_addRequest);
AI_CASE_RETURN(AICurlEasyRequestStateMachine_waitAdded);
AI_CASE_RETURN(AICurlEasyRequestStateMachine_waitFinished);
AI_CASE_RETURN(AICurlEasyRequestStateMachine_waitRemoved);
AI_CASE_RETURN(AICurlEasyRequestStateMachine_finished);
}
return "UNKNOWN STATE";
@@ -63,7 +63,7 @@ void AICurlEasyRequestStateMachine::initialize_impl(void)
// CURL-THREAD
void AICurlEasyRequestStateMachine::added_to_multi_handle(AICurlEasyRequest_wat&)
{
set_state(AICurlEasyRequestStateMachine_waitFinished);
set_state(AICurlEasyRequestStateMachine_waitRemoved);
}
// CURL-THREAD
@@ -83,17 +83,17 @@ void AICurlEasyRequestStateMachine::multiplex_impl(void)
{
case AICurlEasyRequestStateMachine_addRequest:
{
mCurlEasyRequest.addRequest();
set_state(AICurlEasyRequestStateMachine_waitAdded);
}
case AICurlEasyRequestStateMachine_waitAdded:
{
idle(); // Wait till AICurlEasyRequestStateMachine::added_to_multi_handle() is called.
idle(AICurlEasyRequestStateMachine_waitAdded); // Wait till AICurlEasyRequestStateMachine::added_to_multi_handle() is called.
// Only AFTER going idle, add request to curl thread; this is needed because calls to set_state() are
// ignored when the statemachine is not idle, and theoretically the callbacks could be called
// immediately after this call.
mCurlEasyRequest.addRequest();
break;
}
case AICurlEasyRequestStateMachine_waitFinished:
case AICurlEasyRequestStateMachine_waitRemoved:
{
idle(); // Wait till AICurlEasyRequestStateMachine::finished() is called.
idle(AICurlEasyRequestStateMachine_waitRemoved); // Wait till AICurlEasyRequestStateMachine::removed_from_multi_handle() is called.
break;
}
case AICurlEasyRequestStateMachine_finished:

View File

@@ -152,37 +152,74 @@ void AIStateMachine::run(callback_type::signal_type::slot_type const& slot)
void AIStateMachine::idle(void)
{
DoutEntering(dc::statemachine, "AIStateMachine::idle() [" << (void*)this << "]");
llassert(is_main_thread());
llassert(!mIdle);
mIdle = true;
mSleep = 0;
#ifdef SHOW_ASSERT
mCalledThreadUnsafeIdle = true;
#endif
}
void AIStateMachine::idle(state_type current_run_state)
{
DoutEntering(dc::statemachine, "AIStateMachine::idle() [" << (void*)this << "]");
llassert(is_main_thread());
llassert(!mIdle);
mSetStateLock.lock();
// Only go idle if the run state is (still) what we expect it to be.
// Otherwise assume that another thread called set_state() and continue running.
if (current_run_state == mRunState)
{
mIdle = true;
mSleep = 0;
}
mSetStateLock.unlock();
}
// About thread safeness:
//
// The main thread initializes a statemachine and calls run, so a statemachine
// runs in the main thread. However, it is allowed that a state calls idle()
// and then allows one and only one other thread to call cont() upon some
// and then allows one or more other threads to call cont() upon some
// event (only once, of course, as idle() has to be called before cont()
// can be called again-- and another thread is not allowed to call idle()).
// Instead of cont(), the other thread may also call set_state().
void AIStateMachine::cont(void)
// can be called again-- and a non-main thread is not allowed to call idle()).
// Instead of cont() one may also call set_state().
// Of course, this may give rise to a race condition; if that happens then
// the thread that calls cont() (set_state()) first is serviced, and the other
// thread(s) are ignored, as if they never called cont().
void AIStateMachine::locked_cont(void)
{
DoutEntering(dc::statemachine, "AIStateMachine::cont() [" << (void*)this << "]");
llassert(mIdle);
// Atomic test mActive and change mIdle.
mIdleActive.lock();
#ifdef SHOW_ASSERT
mContThread = apr_os_thread_current();
#endif
mIdle = false;
bool not_active = mActive == as_idle;
mIdleActive.unlock();
// mActive is only changed in AIStateMachine::mainloop, by the main-thread, and
// here, possibly by any thread. However, after setting mIdle to false above, it
// is impossible for any thread to come here, until after the main-thread called
// idle(). So, if this is the main thread then that certainly isn't going to
// happen until we left this function, while if this is another thread and the
// state machine is already running in the main thread then not_active is false
// and we're already at the end of this function.
// If not_active is true then main-thread is not running this statemachine.
// It might call cont() (or set_state()) but never locked_cont(), and will never
// start actually running until we are done here and release the lock on
// continued_statemachines_and_calling_mainloop again. It is therefore safe
// to release mSetStateLock here, with as advantage that if we're not the main-
// thread and not_active is true, then the main-thread won't block when it has
// a timer running that times out and calls set_state().
mSetStateLock.unlock();
if (not_active)
{
AIWriteAccess<cscm_type> cscm_w(continued_statemachines_and_calling_mainloop);
// We only get here when the statemachine was idle (set by the main thread),
// see first assertion. Hence, the main thread is not changing this, as the
// statemachine is not running. Thus, mActive can have changed when a THIRD
// thread called cont(), which is not allowed: if two threads can call cont()
// at any moment then the first assertion can't hold.
// See above: it is not possible that mActive was changed since not_active
// was set to true above.
llassert_always(mActive == as_idle);
cscm_w->continued_statemachines.push_back(this);
if (!cscm_w->calling_mainloop)
@@ -192,21 +229,77 @@ void AIStateMachine::cont(void)
gIdleCallbacks.addFunction(&AIStateMachine::mainloop);
}
mActive = as_queued;
llassert_always(!mIdle); // It should never happen that one thread calls cont() while another calls idle() concurrently.
llassert_always(!mIdle); // It should never happen that the main thread calls idle(), while another thread calls cont() concurrently.
}
}
void AIStateMachine::set_state(state_type state)
{
DoutEntering(dc::statemachine, "AIStateMachine::set_state(" << state_str(state) << ") [" << (void*)this << "]");
// Do not call abort(), finish() or kill() before cancelling the possibility that other threads call set_state().
// Do not call set_state() unless running.
llassert(mState == bs_run);
// Do not call idle() when set_state is called from another thread.
llassert(!mCalledThreadUnsafeIdle || LLThread::is_main_thread());
// If this function is called from another thread than the main thread, then we have to ignore
// it if we're not idle and the state is less than the current state. The main thread must
// be able to change the state to anything (also smaller values). Note that that only can work
// if the main thread itself at all times cancels thread callbacks that call set_state()
// before calling idle() again!
//
// Thus: main thead calls idle(), and tells one or more threads to do callbacks on events,
// which (might) call set_state(). If the main thread calls set_state first (currently only
// possible as a result of the use of a timer) it will set mIdle to false (here) then cancel
// the call backs from the other threads and only then call idle() again.
// Thus if you want other threads get here while mIdle is false to be ignored then the
// main thread should use a large value for the new run state.
//
// If a non-main thread calls set_state first, then the state is changed but the main thread
// can still override it if it calls set_state before handling the new state; in the latter
// case it would still be as if the call from the non-main thread was ignored.
//
// Concurrent calls from non-main threads however, always result in the largest state
// to prevail.
// Stop race condition of multiple threads calling cont() or set_state() here.
mSetStateLock.lock();
// If the state machine is already running, and we are not the main-thread and the new
// state is less than the current state, ignore it.
if (!mIdle && !LLThread::is_main_thread() && state <= mRunState)
{
#ifdef SHOW_ASSERT
// It's a bit weird if the same thread does two calls on a row where the second call
// has a smaller value: warn about that.
if (mContThread == apr_os_thread_current())
{
llwarns << "Ignoring call to set_state(" << state_str(state) <<
") by non-main thread before main-thread could react on previous call, "
"because new state is smaller than old state (" << state_str(mRunState) << ")." << llendl;
}
#endif
mSetStateLock.unlock();
return; // Ignore.
}
// Change mRunState to the requested value.
if (mRunState != state)
{
mRunState = state;
Dout(dc::statemachine, "mRunState set to " << state_str(mRunState));
}
// Continue the state machine if appropriate.
if (mIdle)
cont();
locked_cont(); // This unlocks mSetStateLock.
else
mSetStateLock.unlock();
// If we get here then mIdle is false, so only mRunState can still be changed but we won't
// call locked_cont() anymore. When the main thread finally picks up on the state change,
// it will cancel any possible callbacks from other threads and process the largest state
// that this function was called with in the meantime.
}
void AIStateMachine::abort(void)

View File

@@ -207,6 +207,11 @@ class AIStateMachine {
active_type mActive; //!< Whether statemachine is idle, queued to be added to the active list, or already on the active list.
S64 mSleep; //!< Non-zero while the state machine is sleeping.
LLMutex mIdleActive; //!< Used for atomic operations on the pair mIdle / mActive.
LLMutex mSetStateLock; //!< For critical areas in set_state() and locked_cont().
#ifdef SHOW_ASSERT
apr_os_thread_t mContThread; //!< Thread that last called locked_cont().
bool mCalledThreadUnsafeIdle; //!< Set to true when idle() is called.
#endif
// Callback facilities.
// From within an other state machine:
@@ -233,16 +238,23 @@ class AIStateMachine {
public:
//! Create a non-running state machine.
AIStateMachine(void) : mState(bs_initialize), mIdle(true), mAborted(true), mActive(as_idle), mSleep(0), mParent(NULL), mCallback(NULL) { updateSettings(); }
AIStateMachine(void) : mState(bs_initialize), mIdle(true), mAborted(true), mActive(as_idle), mSleep(0), mParent(NULL), mCallback(NULL)
#ifdef SHOW_ASSERT
, mContThread(0), mCalledThreadUnsafeIdle(false)
#endif
{ updateSettings(); }
protected:
//! The user should call 'kill()', not delete a AIStateMachine (derived) directly.
virtual ~AIStateMachine() { llassert((mState == bs_killed && mActive == as_idle) || mState == bs_initialize); }
public:
//! Halt the state machine until cont() is called.
//! Halt the state machine until cont() is called (not thread-safe).
void idle(void);
//! Halt the state machine until cont() is called, provided it is still in 'cur_run_state'.
void idle(state_type current_run_state);
//! Temporarily halt the state machine.
void yield_frame(unsigned int frames) { mSleep = -(S64)frames; }
@@ -250,8 +262,18 @@ class AIStateMachine {
void yield_ms(unsigned int ms) { mSleep = LLFastTimer::getCPUClockCount64() + LLFastTimer::countsPerSecond() * ms / 1000; }
//! Continue running after calling idle.
void cont(void);
void cont(void)
{
mSetStateLock.lock();
// Ignore calls to cont() if the statemachine isn't idle. See comments in set_state().
// Calling cont() twice or after calling set_state(), without first calling idle(), is an error.
if (!mIdle) { llassert(mContThread != apr_os_thread_current()); mSetStateLock.unlock(); return; }
locked_cont();
}
private:
void locked_cont(void);
public:
//---------------------------------------
// Changing the state.