Use TCP socket pair instead of UDP

This commit is contained in:
Siana Gearz
2012-08-02 22:33:01 +02:00
parent 725cdc2d69
commit 4650636e5a

View File

@@ -763,10 +763,7 @@ class AICurlThread : public LLThread
void create_wakeup_fds(void);
void cleanup_wakeup_fds(void);
#if !WINDOWS_CODE
//On Windows, single socket is used for communicating with itself! -SG
curl_socket_t mWakeUpFd_in;
#endif
curl_socket_t mWakeUpFd;
int mZeroTimeOut;
@@ -779,9 +776,7 @@ AICurlThread* AICurlThread::sInstance = NULL;
// MAIN-THREAD
AICurlThread::AICurlThread(void) : LLThread("AICurlThread"),
#if !WINDOWS_CODE
mWakeUpFd_in(CURL_SOCKET_BAD),
#endif
mWakeUpFd(CURL_SOCKET_BAD),
mZeroTimeOut(0), mRunning(true), mWakeUpFlag(false)
{
@@ -796,7 +791,7 @@ AICurlThread::~AICurlThread()
cleanup_wakeup_fds();
}
#if WINDOWS_CODE
#if LL_WINDOWS
static std::string formatWSAError()
{
std::ostringstream r;
@@ -807,11 +802,7 @@ static std::string formatWSAError()
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, e, 0, (LPTSTR)&error_str, 0, NULL))
{
#if LL_LINUX
r << " " << error_str;
#else
r << " " << utf16str_to_utf8str(error_str);
#endif
LocalFree(error_str);
}
else
@@ -820,58 +811,112 @@ static std::string formatWSAError()
}
return r.str();
}
#else if WINDOWS_CODE
static std::string formatWSAError()
{
return "NOT IMPLEMENTED";
}
#endif
#if LL_WINDOWS
/* Copyright 2007, 2010 by Nathan C. Myers <ncm@cantrip.org>
* This code is Free Software. It may be copied freely, in original or
* modified form, subject only to the restrictions that (1) the author is
* relieved from all responsibilities for any use for any purpose, and (2)
* this copyright notice must be retained, unchanged, in its entirety. If
* for any reason the author might be held responsible for any consequences
* of copying or use, license is withheld.
*/
static int dumb_socketpair(SOCKET socks[2], bool make_overlapped)
{
union {
struct sockaddr_in inaddr;
struct sockaddr addr;
} a;
SOCKET listener;
int e;
socklen_t addrlen = sizeof(a.inaddr);
DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
int reuse = 1;
if (socks == 0) {
WSASetLastError(WSAEINVAL);
return SOCKET_ERROR;
}
listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listener == INVALID_SOCKET)
return SOCKET_ERROR;
memset(&a, 0, sizeof(a));
a.inaddr.sin_family = AF_INET;
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
a.inaddr.sin_port = 0;
socks[0] = socks[1] = INVALID_SOCKET;
do {
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,
(char*) &reuse, (socklen_t) sizeof(reuse)) == -1)
break;
if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
break;
memset(&a, 0, sizeof(a));
if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR)
break;
// win32 getsockname may only set the port number, p=0.0005.
// ( http://msdn.microsoft.com/library/ms738543.aspx ):
a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
a.inaddr.sin_family = AF_INET;
if (listen(listener, 1) == SOCKET_ERROR)
break;
socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags);
if (socks[0] == INVALID_SOCKET)
break;
if (connect(socks[0], &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR)
break;
socks[1] = accept(listener, NULL, NULL);
if (socks[1] == INVALID_SOCKET)
break;
closesocket(listener);
return 0;
} while (0);
e = WSAGetLastError();
closesocket(listener);
closesocket(socks[0]);
closesocket(socks[1]);
WSASetLastError(e);
return SOCKET_ERROR;
}
#else
int dumb_socketpair(int socks[2], int dummy)
{
(void) dummy;
return socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
}
#endif
// MAIN-THREAD
void AICurlThread::create_wakeup_fds(void)
{
#if WINDOWS_CODE
#if LL_LINUX
mWakeUpFd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
#else
mWakeUpFd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, 0);
#endif
if(mWakeUpFd == INVALID_SOCKET)
{
llerrs << "Failed to create wake-up socket: " << formatWSAError() << llendl;
}
int error;
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
#if LL_LINUX
socklen_t addrlen = sizeof(addr);
#else
int addrlen = sizeof(addr);
#endif
error = bind(mWakeUpFd, (sockaddr*) &addr, addrlen);
if(error)
{
llerrs << "Failed to bind wake-up socket: " << formatWSAError() << llendl;
}
error = getsockname(mWakeUpFd, (sockaddr*) &addr, &addrlen);
if(error)
{
llerrs << "Failed to detect wake-up socket: " << formatWSAError() << llendl;
}
error = connect(mWakeUpFd, (sockaddr*) &addr, addrlen);
if(error)
{
llerrs << "Failed to connect wake-up socket: " <<formatWSAError() << llendl;
}
#if LL_LINUX
long flags = O_NONBLOCK;
error = fcntl(mWakeUpFd, F_SETFL, flags);
#else
u_long nonblocking_enable = TRUE;
error = ioctlsocket(mWakeUpFd, FIONBIO, &nonblocking_enable);
#endif
if(error)
{
llerrs << "Failed to set wake-up socket nonblocking: " << formatWSAError() << llendl;
}
//SGTODO
SOCKET socks[2];
if (dumb_socketpair(socks, false) == SOCKET_ERROR)
{
llerrs << "Failed to generate wake-up socket pair" << formatWSAError() << llendl;
}
else
{
mWakeUpFd = socks[0];
mWakeUpFd_in = socks[1];
}
#else
int pipefd[2];
if (pipe(pipefd))
@@ -895,14 +940,23 @@ void AICurlThread::create_wakeup_fds(void)
void AICurlThread::cleanup_wakeup_fds(void)
{
#if WINDOWS_CODE
if (mWakeUpFd != CURL_SOCKET_BAD)
{
int error = closesocket(mWakeUpFd);
if (error)
//SGTODO
if (mWakeUpFd != CURL_SOCKET_BAD)
{
llwarns << "Error closing wake-up socket" << formatWSAError() << llendl;
int error = closesocket(mWakeUpFd);
if (error)
{
llwarns << "Error closing wake-up socket" << formatWSAError() << llendl;
}
}
if (mWakeUpFd_in != CURL_SOCKET_BAD)
{
int error = closesocket(mWakeUpFd_in);
if (error)
{
llwarns << "Error closing wake-up input socket" << formatWSAError() << llendl;
}
}
}
#else
if (mWakeUpFd_in != CURL_SOCKET_BAD)
close(mWakeUpFd_in);
@@ -927,11 +981,15 @@ void AICurlThread::wakeup_thread(void)
#if WINDOWS_CODE
//SGTODO
int len = send(mWakeUpFd, "!", 1, 0);
Dout(dc::curl, "ENTERING send()");
int len = send(mWakeUpFd_in, "!", 1, 0);
Dout(dc::curl, "LEAVING send()");
if (len == SOCKET_ERROR)
{
llerrs << "Send to wake-up socket failed: " << formatWSAError() << llendl;
}
llassert_always(len == 1);
//SGTODO: handle EAGAIN if needed
llinfos << "Sent wakeup signal" << llendl;
#else
// If write() is interrupted by a signal before it writes any data, it shall return -1 with errno set to [EINTR].
@@ -962,31 +1020,49 @@ void AICurlThread::wakeup(AICurlMultiHandle_wat const& multi_handle_w)
DoutEntering(dc::curl, "AICurlThread::wakeup");
#if WINDOWS_CODE
char buf;
int len;
//SGTODO
char buf[256];
bool got_data = false;
do
for(;;)
{
len = recv(mWakeUpFd, &buf, sizeof(buf), 0);
llinfos << "recv returns " << len << llendl;
if(len != SOCKET_ERROR)
int len = recv(mWakeUpFd, buf, sizeof(buf), 0);
if (len > 0)
{
// Data was read from the pipe.
got_data = true;
if (len < sizeof(buf))
break;
}
else if (len == SOCKET_ERROR)
{
// An error occurred.
if (errno == EWOULDBLOCK)
{
got_data = true;
llinfos << "Received wakeup signal" << llendl;
if (got_data)
break;
// There was no data, even though select() said so. If this ever happens at all(?), lets just return and enter select() again.
return;
}
} while(len != SOCKET_ERROR);
int error = WSAGetLastError();
llinfos << "left loop, errorlevel " << error << llendl;
if(error != WSAEWOULDBLOCK)
{
llerrs << "Wake-up socket drain error: " << formatWSAError() << llendl;
}
if(!got_data)
{
llinfos << "Wakeup called but socket is empty" << llendl;
else if (errno == EINTR)
{
continue;
}
else
{
llerrs << "read(3) from mWakeUpFd: " << formatWSAError() << llendl;
return;
}
}
else
{
// pipe(2) returned 0.
llwarns << "read(3) from mWakeUpFd returned 0, indicating that the pipe on the other end was closed! Shutting down curl thread." << llendl;
closesocket(mWakeUpFd);
mWakeUpFd = CURL_SOCKET_BAD;
mRunning = false;
return;
}
}
llinfos << "Passing control to process_commands" << llendl;
#else
// If a read() is interrupted by a signal before it reads any data, it shall return -1 with errno set to [EINTR].
// If a read() is interrupted by a signal after it has successfully read some data, it shall return the number of bytes read.