Work in progress

This work extends AIStateMachine to run multiplex() in the thread
that calls run(), cont() or set_state(). Note that all three
eventually call locked_cont(), so thats where multiplex() is called
from. Calling multiplex() means "running the state machine", as in
"calling multiplex_impl".

Currently only LLURLRequest uses this feature, and then only
for the HTTPGetResponder, and well only for the initializing,
start up and normal finish states.

A current/remaining problem is that we run into a situation where
the curl thread runs a statemachine to it's finish and kills it,
while the main thread is also 'running' it and tries to call
multiplex while the statemachine isn't running anymore.
This commit is contained in:
Aleric Inglewood
2013-02-20 23:29:34 +01:00
parent ff3910a705
commit ef35aa7954
14 changed files with 235 additions and 75 deletions

View File

@@ -832,8 +832,9 @@ class AICurlThread : public LLThread
{
public:
static AICurlThread* sInstance;
LLMutex mWakeUpMutex;
bool mWakeUpFlag; // Protected by mWakeUpMutex.
LLMutex mWakeUpMutex; // Set while a thread is waking up the curl thread.
LLMutex mWakeUpFlagMutex; // Set when the curl thread is sleeping (in or about to enter select()).
bool mWakeUpFlag; // Protected by mWakeUpFlagMutex.
public:
// MAIN-THREAD
@@ -1067,11 +1068,10 @@ void AICurlThread::cleanup_wakeup_fds(void)
#endif
}
// MAIN-THREAD
// OTHER THREADS
void AICurlThread::wakeup_thread(bool stop_thread)
{
DoutEntering(dc::curl, "AICurlThread::wakeup_thread");
llassert(is_main_thread());
// If we are already exiting the viewer then return immediately.
if (!mRunning)
@@ -1079,12 +1079,20 @@ void AICurlThread::wakeup_thread(bool stop_thread)
// Last time we are run?
if (stop_thread)
mRunning = false;
mRunning = false; // Thread-safe because all other threads were already stopped.
// Stop two threads running the following code at the same time.
if (!mWakeUpMutex.tryLock())
{
// If another thread is already busy waking up the curl thread, then we don't have to.
return;
}
// Try if curl thread is still awake and if so, pass the new commands directly.
if (mWakeUpMutex.tryLock())
if (mWakeUpFlagMutex.tryLock())
{
mWakeUpFlag = true;
mWakeUpFlagMutex.unlock();
mWakeUpMutex.unlock();
return;
}
@@ -1111,7 +1119,10 @@ void AICurlThread::wakeup_thread(bool stop_thread)
{
len = write(mWakeUpFd_in, "!", 1);
if (len == -1 && errno == EAGAIN)
{
mWakeUpMutex.unlock();
return; // Unread characters are still in the pipe, so no need to add more.
}
}
while(len == -1 && errno == EINTR);
if (len == -1)
@@ -1120,6 +1131,7 @@ void AICurlThread::wakeup_thread(bool stop_thread)
}
llassert_always(len == 1);
#endif
mWakeUpMutex.unlock();
}
apr_status_t AICurlThread::join_thread(void)
@@ -1247,9 +1259,9 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
command_queue_wat command_queue_w(command_queue);
if (command_queue_w->empty())
{
mWakeUpMutex.lock();
mWakeUpFlagMutex.lock();
mWakeUpFlag = false;
mWakeUpMutex.unlock();
mWakeUpFlagMutex.unlock();
break;
}
// Move the next command from the queue into command_being_processed.
@@ -1312,10 +1324,10 @@ void AICurlThread::run(void)
// Process every command in command_queue before filling the fd_set passed to select().
for(;;)
{
mWakeUpMutex.lock();
mWakeUpFlagMutex.lock();
if (mWakeUpFlag)
{
mWakeUpMutex.unlock();
mWakeUpFlagMutex.unlock();
process_commands(multi_handle_w);
continue;
}
@@ -1324,7 +1336,7 @@ void AICurlThread::run(void)
// wakeup_thread() is also called after setting mRunning to false.
if (!mRunning)
{
mWakeUpMutex.unlock();
mWakeUpFlagMutex.unlock();
break;
}
@@ -1400,7 +1412,7 @@ void AICurlThread::run(void)
#endif
#endif
ready = select(nfds, read_fd_set, write_fd_set, NULL, &timeout);
mWakeUpMutex.unlock();
mWakeUpFlagMutex.unlock();
#ifdef CWDEBUG
#ifdef DEBUG_CURLIO
Dout(dc::finish|cond_error_cf(ready == -1), ready);