Make AICurlThread::wakeup_thread thread-safe.
This commit is contained in:
@@ -832,6 +832,7 @@ class AICurlThread : public LLThread
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
static AICurlThread* sInstance;
|
static AICurlThread* sInstance;
|
||||||
|
LLMutex mWakeUpMutex; // Set while a thread is waking up the curl thread.
|
||||||
LLMutex mWakeUpFlagMutex; // Set when the curl thread is sleeping (in or about to enter select()).
|
LLMutex mWakeUpFlagMutex; // Set when the curl thread is sleeping (in or about to enter select()).
|
||||||
bool mWakeUpFlag; // Protected by mWakeUpFlagMutex.
|
bool mWakeUpFlag; // Protected by mWakeUpFlagMutex.
|
||||||
|
|
||||||
@@ -1067,11 +1068,10 @@ void AICurlThread::cleanup_wakeup_fds(void)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
// MAIN-THREAD
|
// OTHER THREADS
|
||||||
void AICurlThread::wakeup_thread(bool stop_thread)
|
void AICurlThread::wakeup_thread(bool stop_thread)
|
||||||
{
|
{
|
||||||
DoutEntering(dc::curl, "AICurlThread::wakeup_thread");
|
DoutEntering(dc::curl, "AICurlThread::wakeup_thread");
|
||||||
llassert(is_main_thread());
|
|
||||||
|
|
||||||
// If we are already exiting the viewer then return immediately.
|
// If we are already exiting the viewer then return immediately.
|
||||||
if (!mRunning)
|
if (!mRunning)
|
||||||
@@ -1079,13 +1079,30 @@ void AICurlThread::wakeup_thread(bool stop_thread)
|
|||||||
|
|
||||||
// Last time we are run?
|
// Last time we are run?
|
||||||
if (stop_thread)
|
if (stop_thread)
|
||||||
mRunning = false;
|
mRunning = false; // Thread-safe because all other threads were already stopped.
|
||||||
|
|
||||||
|
// Note, we do not want this function to be blocking the calling thread; therefore we only use tryLock()s.
|
||||||
|
|
||||||
|
// Stop two threads running the following code concurrently.
|
||||||
|
if (!mWakeUpMutex.tryLock())
|
||||||
|
{
|
||||||
|
// If we failed to obtain mWakeUpMutex then another thread is (or was) in AICurlThread::wakeup_thread,
|
||||||
|
// or curl was holding the lock for a micro second at the start of process_commands.
|
||||||
|
// In the first case, curl might or might not yet have been woken up because of that, but if it was
|
||||||
|
// then it could not have started processing the commands yet, because it needs to obtain mWakeUpMutex
|
||||||
|
// between being woken up and processing the commands.
|
||||||
|
// Either way, the command that this thread called this function for was already in the queue (it's
|
||||||
|
// added before this function is called) but the command(s) that another thread called this function
|
||||||
|
// for were not processed yet. Hence, it's safe to exit here as our command(s) will be processed too.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Try if curl thread is still awake and if so, pass the new commands directly.
|
// Try if curl thread is still awake and if so, pass the new commands directly.
|
||||||
if (mWakeUpFlagMutex.tryLock())
|
if (mWakeUpFlagMutex.tryLock())
|
||||||
{
|
{
|
||||||
mWakeUpFlag = true;
|
mWakeUpFlag = true;
|
||||||
mWakeUpFlagMutex.unlock();
|
mWakeUpFlagMutex.unlock();
|
||||||
|
mWakeUpMutex.unlock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1111,7 +1128,10 @@ void AICurlThread::wakeup_thread(bool stop_thread)
|
|||||||
{
|
{
|
||||||
len = write(mWakeUpFd_in, "!", 1);
|
len = write(mWakeUpFd_in, "!", 1);
|
||||||
if (len == -1 && errno == EAGAIN)
|
if (len == -1 && errno == EAGAIN)
|
||||||
|
{
|
||||||
|
mWakeUpMutex.unlock();
|
||||||
return; // Unread characters are still in the pipe, so no need to add more.
|
return; // Unread characters are still in the pipe, so no need to add more.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
while(len == -1 && errno == EINTR);
|
while(len == -1 && errno == EINTR);
|
||||||
if (len == -1)
|
if (len == -1)
|
||||||
@@ -1120,6 +1140,12 @@ void AICurlThread::wakeup_thread(bool stop_thread)
|
|||||||
}
|
}
|
||||||
llassert_always(len == 1);
|
llassert_always(len == 1);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// Release the lock here and not sooner, for the sole purpose of making sure
|
||||||
|
// that not two threads execute the above code concurrently. If the above code
|
||||||
|
// is thread-safe (maybe it is?) then we could release this lock arbitrarily
|
||||||
|
// sooner indeed - or even not lock it at all.
|
||||||
|
mWakeUpMutex.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
apr_status_t AICurlThread::join_thread(void)
|
apr_status_t AICurlThread::join_thread(void)
|
||||||
@@ -1239,6 +1265,16 @@ void AICurlThread::process_commands(AICurlMultiHandle_wat const& multi_handle_w)
|
|||||||
{
|
{
|
||||||
DoutEntering(dc::curl, "AICurlThread::process_commands(void)");
|
DoutEntering(dc::curl, "AICurlThread::process_commands(void)");
|
||||||
|
|
||||||
|
// Block here until the thread that woke us up released mWakeUpMutex.
|
||||||
|
// This is necessary to make sure that a third thread added commands
|
||||||
|
// too then either it will signal us later, or we process those commands
|
||||||
|
// now, too.
|
||||||
|
mWakeUpMutex.lock();
|
||||||
|
// Note that if at THIS point another thread tries to obtain mWakeUpMutex in AICurlThread::wakeup_thread
|
||||||
|
// and fails, it is ok that it leaves that function without waking us up too: we're awake and
|
||||||
|
// about to process any commands!
|
||||||
|
mWakeUpMutex.unlock();
|
||||||
|
|
||||||
// If we get here then the main thread called wakeup_thread() recently.
|
// If we get here then the main thread called wakeup_thread() recently.
|
||||||
for(;;)
|
for(;;)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user