llmessage v2 merge.

This commit is contained in:
Shyotl
2011-07-15 02:04:57 -05:00
parent 7e0ee6bb71
commit 3fbfeb2de5
8 changed files with 124 additions and 261 deletions

View File

@@ -56,7 +56,6 @@ set(llmessage_SOURCE_FILES
llmessagethrottle.cpp
llmime.cpp
llnamevalue.cpp
llnetcanary.cpp
llnullcipher.cpp
llpacketack.cpp
llpacketbuffer.cpp
@@ -152,7 +151,6 @@ set(llmessage_HEADER_FILES
llmime.h
llmsgvariabletype.h
llnamevalue.h
llnetcanary.h
llnullcipher.h
llpacketack.h
llpacketbuffer.h

View File

@@ -178,7 +178,7 @@ LLSocket::ptr_t LLSocket::create(EType type, U16 port)
port = PORT_EPHEMERAL;
}
rv->mPort = port;
rv->setOptions();
rv->setNonBlocking();
return rv;
}
@@ -201,7 +201,7 @@ LLSocket::ptr_t LLSocket::create(apr_status_t& status, LLSocket::ptr_t& listen_s
return rv;
}
rv->mPort = PORT_EPHEMERAL;
rv->setOptions();
rv->setNonBlocking();
return rv;
}
@@ -221,10 +221,10 @@ bool LLSocket::blockingConnect(const LLHost& host)
{
return false;
}
apr_socket_timeout_set(mSocket, 1000);
setBlocking(1000);
ll_debug_socket("Blocking connect", mSocket);
if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
setOptions();
setNonBlocking();
return true;
}
@@ -246,11 +246,27 @@ LLSocket::~LLSocket()
}
}
void LLSocket::setOptions()
// See http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html#ss13.4
// for an explanation of how to get non-blocking sockets and timeouts with
// consistent behavior across platforms.
void LLSocket::setBlocking(S32 timeout)
{
LLMemType m1(LLMemType::MTYPE_IO_TCP);
// set up the socket options
ll_apr_warn_status(apr_socket_timeout_set(mSocket, timeout));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 0));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
}
void LLSocket::setNonBlocking()
{
LLMemType m1(LLMemType::MTYPE_IO_TCP);
// set up the socket options
ll_apr_warn_status(apr_socket_timeout_set(mSocket, 0));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_NONBLOCK, 1));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_SNDBUF, LL_SEND_BUFFER_SIZE));
ll_apr_warn_status(apr_socket_opt_set(mSocket, APR_SO_RCVBUF, LL_RECV_BUFFER_SIZE));
@@ -588,6 +604,7 @@ LLIOPipe::EStatus LLIOServerSocket::process_impl(
{
chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(llsocket)));
pump->addChain(chain, mResponseTimeout);
status = STATUS_OK;
}
else
{

View File

@@ -146,9 +146,16 @@ protected:
LLSocket(void);
/**
* @brief Set default socket options.
* @brief Set default socket options, with SO_NONBLOCK = 0 and a timeout in us.
* @param timeout Number of microseconds to wait on this socket. Any
* negative number means block-forever. TIMEOUT OF 0 IS NON-PORTABLE.
*/
void setOptions();
void setBlocking(S32 timeout);
/**
* @brief Set default socket options, with SO_NONBLOCK = 1 and timeout = 0.
*/
void setNonBlocking();
public:
/**

View File

@@ -1,149 +0,0 @@
// <edit>
#include "linden_common.h"
#include "llnetcanary.h"
#include "llerror.h"
#ifdef _MSC_VER
#include <winsock2.h>
static WSADATA trapWSAData;
#define socklen_t int
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#if LL_DARWIN
#ifndef _SOCKLEN_T
#define _SOCKLEN_T
typedef int socklen_t;
#endif
#endif
extern int errno; //error number
#define SOCKADDR_IN struct sockaddr_in
#define SOCKADDR struct sockaddr
#define SOCKET int
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define SD_BOTH 2
#define closesocket close
#endif
SOCKADDR_IN trapLclAddr;
LLNetCanary::LLNetCanary(int requested_port)
{
mGood = TRUE;
int nRet;
int hSocket;
int snd_size = 400000;
int rec_size = 400000;
int buff_size = 4;
#ifdef _MSC_VER
if(WSAStartup(0x0202, &trapWSAData))
{
S32 err = WSAGetLastError();
WSACleanup();
llwarns << "WSAStartup() failure: " << err << llendl;
mGood = FALSE;
return;
}
#endif
// Get a datagram socket
hSocket = (int)socket(AF_INET, SOCK_DGRAM, 0);
if (hSocket == INVALID_SOCKET)
{
#ifdef _MSC_VER
S32 err = WSAGetLastError();
WSACleanup();
#else
S32 err = errno;
#endif
mGood = FALSE;
llwarns << "socket() failure: " << err << llendl;
return;
}
// Name the socket (assign the local port number to receive on)
trapLclAddr.sin_family = AF_INET;
trapLclAddr.sin_addr.s_addr = htonl(INADDR_ANY);
trapLclAddr.sin_port = htons(requested_port);
//llinfos << "bind() port: " << requested_port << llendl;
nRet = bind(hSocket, (struct sockaddr*) &trapLclAddr, sizeof(trapLclAddr));
if (nRet == SOCKET_ERROR)
{
#ifdef _MSC_VER
S32 err = WSAGetLastError();
WSACleanup();
#else
S32 err = errno;
#endif
mGood = FALSE;
llwarns << "bind() failed: " << err << llendl;
return;
}
// Find out what address we got
SOCKADDR_IN socket_address;
socklen_t socket_address_size = sizeof(socket_address);
getsockname(hSocket, (SOCKADDR*) &socket_address, &socket_address_size);
mPort = ntohs(socket_address.sin_port);
//llinfos << "got port " << mPort << llendl;
// Set socket to be non-blocking
#ifdef _MSC_VER
unsigned long argp = 1;
nRet = ioctlsocket (hSocket, FIONBIO, &argp);
#else
nRet = fcntl(hSocket, F_SETFL, O_NONBLOCK);
#endif
if (nRet == SOCKET_ERROR)
{
#ifdef _MSC_VER
S32 err = WSAGetLastError();
WSACleanup();
#else
S32 err = errno;
#endif
mGood = FALSE;
llwarns << "Failed to set socket non-blocking, Err: " << err << llendl;
return;
}
// set a large receive buffer
nRet = setsockopt(hSocket, SOL_SOCKET, SO_RCVBUF, (char *)&rec_size, buff_size);
if (nRet)
{
llinfos << "Can't set receive buffer size!" << llendl;
}
// set a large send buffer
nRet = setsockopt(hSocket, SOL_SOCKET, SO_SNDBUF, (char *)&snd_size, buff_size);
if (nRet)
{
llinfos << "Can't set send buffer size!" << llendl;
}
//getsockopt(hSocket, SOL_SOCKET, SO_RCVBUF, (char *)&rec_size, &buff_size);
//getsockopt(hSocket, SOL_SOCKET, SO_SNDBUF, (char *)&snd_size, &buff_size);
//LL_DEBUGS("AppInit") << "startNet - receive buffer size : " << rec_size << LL_ENDL;
//LL_DEBUGS("AppInit") << "startNet - send buffer size : " << snd_size << LL_ENDL;
mSocket = hSocket;
}
LLNetCanary::~LLNetCanary()
{
if(mGood)
{
if(mSocket)
{
shutdown(mSocket, SD_BOTH);
closesocket(mSocket);
}
#ifdef _MSC_VER
WSACleanup();
#endif
}
}
// </edit>

View File

@@ -1,25 +0,0 @@
// <edit>
#ifndef LL_LLNETCANARY_H
#define LL_LLNETCANARY_H
#include "stdtypes.h"
#include <string>
class LLNetCanary
{
public:
LLNetCanary(int requested_port);
~LLNetCanary();
BOOL mGood;
S32 mSocket;
U16 mPort;
U8 mBuffer[8192];
typedef struct
{
F64 time; // LLTimer::getElapsedSeconds();
//U8 message[4];
U32 message;
U32 points;
std::string name;
} entry;
};
#endif
// </edit>

View File

@@ -52,10 +52,13 @@ static const U32 HTTP_STATUS_PIPE_ERROR = 499;
* String constants
*/
const std::string CONTEXT_DEST_URI_SD_LABEL("dest_uri");
const std::string CONTEXT_TRANSFERED_BYTES("transfered_bytes");
static size_t headerCallback(void* data, size_t size, size_t nmemb, void* user);
/**
* class LLURLRequestDetail
*/
@@ -99,6 +102,26 @@ LLURLRequestDetail::~LLURLRequestDetail()
* class LLURLRequest
*/
// static
std::string LLURLRequest::actionAsVerb(LLURLRequest::ERequestAction action)
{
static const std::string VERBS[] =
{
"(invalid)",
"HEAD",
"GET",
"PUT",
"POST",
"DELETE",
"MOVE"
};
if(((S32)action <=0) || ((S32)action >= REQUEST_ACTION_COUNT))
{
return VERBS[0];
}
return VERBS[action];
}
LLURLRequest::LLURLRequest(LLURLRequest::ERequestAction action) :
mAction(action)
{
@@ -128,6 +151,10 @@ void LLURLRequest::setURL(const std::string& url)
mDetail->mURL = url;
}
std::string LLURLRequest::getURL() const
{
return mDetail->mURL;
}
void LLURLRequest::addHeader(const char* header)
{
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
@@ -201,6 +228,11 @@ void LLURLRequest::useProxy(const std::string &proxy)
mDetail->mCurlRequest->setoptString(CURLOPT_PROXY, proxy);
}
void LLURLRequest::allowCookies()
{
mDetail->mCurlRequest->setoptString(CURLOPT_COOKIEFILE, "");
}
// virtual
LLIOPipe::EStatus LLURLRequest::handleError(
LLIOPipe::EStatus status,
@@ -232,8 +264,30 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
PUMP_DEBUG;
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
//llinfos << "LLURLRequest::process_impl()" << llendl;
if(!buffer) return STATUS_ERROR;
if(!mDetail) return STATUS_ERROR; //Seems to happen on occasion. Need to hunt down why.
if (!buffer) return STATUS_ERROR;
if (!mDetail) return STATUS_ERROR; //Seems to happen on occasion. Need to hunt down why.
// we're still waiting or prcessing, check how many
// bytes we have accumulated.
const S32 MIN_ACCUMULATION = 100000;
if(pump && (mDetail->mByteAccumulator > MIN_ACCUMULATION))
{
// This is a pretty sloppy calculation, but this
// tries to make the gross assumption that if data
// is coming in at 56kb/s, then this transfer will
// probably succeed. So, if we're accumlated
// 100,000 bytes (MIN_ACCUMULATION) then let's
// give this client another 2s to complete.
const F32 TIMEOUT_ADJUSTMENT = 2.0f;
mDetail->mByteAccumulator = 0;
pump->adjustTimeoutSeconds(TIMEOUT_ADJUSTMENT);
lldebugs << "LLURLRequest adjustTimeoutSeconds for request: " << mDetail->mURL << llendl;
if (mState == STATE_INITIALIZED)
{
llinfos << "LLURLRequest adjustTimeoutSeconds called during upload" << llendl;
}
}
switch(mState)
{
case STATE_INITIALIZED:
@@ -272,27 +326,14 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
bool newmsg = mDetail->mCurlRequest->getResult(&result);
if(!newmsg)
{
// we're still waiting or prcessing, check how many
// bytes we have accumulated.
const S32 MIN_ACCUMULATION = 100000;
if(pump && (mDetail->mByteAccumulator > MIN_ACCUMULATION))
{
// This is a pretty sloppy calculation, but this
// tries to make the gross assumption that if data
// is coming in at 56kb/s, then this transfer will
// probably succeed. So, if we're accumlated
// 100,000 bytes (MIN_ACCUMULATION) then let's
// give this client another 2s to complete.
const F32 TIMEOUT_ADJUSTMENT = 2.0f;
mDetail->mByteAccumulator = 0;
pump->adjustTimeoutSeconds(TIMEOUT_ADJUSTMENT);
}
// keep processing
break;
}
mState = STATE_HAVE_RESPONSE;
context[CONTEXT_REQUEST][CONTEXT_TRANSFERED_BYTES] = mRequestTransferedBytes;
context[CONTEXT_RESPONSE][CONTEXT_TRANSFERED_BYTES] = mResponseTransferedBytes;
lldebugs << this << "Setting context to " << context << llendl;
switch(result)
{
case CURLE_OK:
@@ -339,10 +380,16 @@ LLIOPipe::EStatus LLURLRequest::process_impl(
// we already stuffed everything into channel in in the curl
// callback, so we are done.
eos = true;
context[CONTEXT_REQUEST][CONTEXT_TRANSFERED_BYTES] = mRequestTransferedBytes;
context[CONTEXT_RESPONSE][CONTEXT_TRANSFERED_BYTES] = mResponseTransferedBytes;
lldebugs << this << "Setting context to " << context << llendl;
return STATUS_DONE;
default:
PUMP_DEBUG;
context[CONTEXT_REQUEST][CONTEXT_TRANSFERED_BYTES] = mRequestTransferedBytes;
context[CONTEXT_RESPONSE][CONTEXT_TRANSFERED_BYTES] = mResponseTransferedBytes;
lldebugs << this << "Setting context to " << context << llendl;
return STATUS_ERROR;
}
}
@@ -355,6 +402,8 @@ void LLURLRequest::initialize()
mDetail->mCurlRequest->setopt(CURLOPT_NOSIGNAL, 1);
mDetail->mCurlRequest->setWriteCallback(&downCallback, (void*)this);
mDetail->mCurlRequest->setReadCallback(&upCallback, (void*)this);
mRequestTransferedBytes = 0;
mResponseTransferedBytes = 0;
}
bool LLURLRequest::configure()
@@ -375,6 +424,9 @@ bool LLURLRequest::configure()
case HTTP_GET:
mDetail->mCurlRequest->setopt(CURLOPT_HTTPGET, 1);
mDetail->mCurlRequest->setopt(CURLOPT_FOLLOWLOCATION, 1);
// Set Accept-Encoding to allow response compression
mDetail->mCurlRequest->setoptString(CURLOPT_ENCODING, "");
rv = true;
break;
@@ -399,6 +451,9 @@ bool LLURLRequest::configure()
// Set the handle for an http post
mDetail->mCurlRequest->setPost(NULL, bytes);
// Set Accept-Encoding to allow response compression
mDetail->mCurlRequest->setoptString(CURLOPT_ENCODING, "");
rv = true;
break;
@@ -457,6 +512,7 @@ size_t LLURLRequest::downCallback(
req->mDetail->mChannels.out(),
(U8*)data,
bytes);
req->mResponseTransferedBytes += bytes;
req->mDetail->mByteAccumulator += bytes;
return bytes;
}
@@ -480,6 +536,7 @@ size_t LLURLRequest::upCallback(
req->mDetail->mLastRead,
(U8*)data,
bytes);
req->mRequestTransferedBytes += bytes;
return bytes;
}

View File

@@ -45,6 +45,11 @@
#include "llchainio.h"
#include "llerror.h"
extern const std::string CONTEXT_REQUEST;
extern const std::string CONTEXT_DEST_URI_SD_LABEL;
extern const std::string CONTEXT_RESPONSE;
extern const std::string CONTEXT_TRANSFERED_BYTES;
class LLURLRequestDetail;
class LLURLRequestComplete;
@@ -81,6 +86,11 @@ public:
REQUEST_ACTION_COUNT
};
/**
* @brief Turn the requst action into an http verb.
*/
static std::string actionAsVerb(ERequestAction action);
/**
* @brief Constructor.
*
@@ -114,7 +124,7 @@ public:
*
*/
void setURL(const std::string& url);
std::string getURL() const;
/**
* @brief Add a header to the http post.
*
@@ -173,6 +183,11 @@ public:
*/
void useProxy(const std::string& proxy);
/**
* @brief Turn on cookie handling for this request with CURLOPT_COOKIEFILE.
*/
void allowCookies();
public:
/**
* @brief Give this pipe a chance to handle a generated error
@@ -203,6 +218,8 @@ protected:
ERequestAction mAction;
LLURLRequestDetail* mDetail;
LLIOPipe::ptr_t mCompletionCallback;
S32 mRequestTransferedBytes;
S32 mResponseTransferedBytes;
private:
/**
@@ -351,62 +368,6 @@ protected:
};
/**
* @class LLURLRequestClientFactory
* @brief Template class to build url request based client chains
*
* This class eases construction of a basic sd rpc client. Here is an
* example of it's use:
* <code>
* class LLUsefulService : public LLService { ... }<br>
* LLService::registerCreator(<br>
* "useful",<br>
* LLService::creator_t(new LLURLRequestClientFactory<LLUsefulService>))<br>
* </code>
*
* This class should work, but I never got around to using/testing it.
*
*/
#if 0
template<class Client>
class LLURLRequestClientFactory : public LLChainIOFactory
{
public:
LLURLRequestClientFactory(LLURLRequest::ERequestAction action) {}
LLURLRequestClientFactory(
LLURLRequest::ERequestAction action,
const std::string& fixed_url) :
mAction(action),
mURL(fixed_url)
{
}
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
{
lldebugs << "LLURLRequestClientFactory::build" << llendl;
LLIOPipe::ptr_t service(new Client);
chain.push_back(service);
LLURLRequest* http(new LLURLRequest(mAction));
LLIOPipe::ptr_t http_pipe(http);
// *FIX: how do we know the content type?
//http->addHeader("Content-Type: text/llsd");
if(mURL.empty())
{
chain.push_back(LLIOPipe::ptr_t(new LLContextURLExtractor(http)));
}
else
{
http->setURL(mURL);
}
chain.push_back(http_pipe);
chain.push_back(service);
return true;
}
protected:
LLURLRequest::ERequestAction mAction;
std::string mURL;
};
#endif
/**
* External constants

View File

@@ -67,9 +67,6 @@
#include "llstoredmessage.h"
#include "llsocks5.h"
// <edit>
#include "llnetcanary.h"
// </edit>
const U32 MESSAGE_MAX_STRINGS_LENGTH = 64;
const U32 MESSAGE_NUMBER_OF_HASH_BUCKETS = 8192;