/** * @file aiperservice.cpp * @brief Implementation of AIPerService * * Copyright (c) 2012, 2013, Aleric Inglewood. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * * There are special exceptions to the terms and conditions of the GPL as * it is applied to this Source Code. View the full text of the exception * in the file doc/FLOSS-exception.txt in this software distribution. * * CHANGELOG * and additional copyright holders. * * 04/11/2012 * Initial version, written by Aleric Inglewood @ SL * * 06/04/2013 * Renamed AICurlPrivate::PerHostRequestQueue[Ptr] to AIPerHostRequestQueue[Ptr] * to allow public access. * * 09/04/2013 * Renamed everything "host" to "service" and use "hostname:port" as key * instead of just "hostname". */ #include "sys.h" #include "aicurlperservice.h" #include "aicurlthread.h" #include "llcontrol.h" AIPerService::threadsafe_instance_map_type AIPerService::sInstanceMap; AIThreadSafeSimpleDC AIPerService::sTotalQueued; #undef AICurlPrivate namespace AICurlPrivate { // Cached value of CurlConcurrentConnectionsPerService. U16 CurlConcurrentConnectionsPerService; // Friend functions of RefCountedThreadSafePerService void intrusive_ptr_add_ref(RefCountedThreadSafePerService* per_service) { per_service->mReferenceCount++; } void intrusive_ptr_release(RefCountedThreadSafePerService* per_service) { if (--per_service->mReferenceCount == 0) { delete per_service; } } } // namespace AICurlPrivate using namespace AICurlPrivate; AIPerService::AIPerService(void) : mHTTPBandwidth(25), // 25 = 1000 ms / 40 ms. mConcurrentConnections(CurlConcurrentConnectionsPerService), mApprovedRequests(0), mTotalAdded(0), mEventPolls(0), mEstablishedConnections(0), mUsedCT(0), mCTInUse(0) { } AIPerService::CapabilityType::CapabilityType(void) : mApprovedRequests(0), mQueuedCommands(0), mAdded(0), mFlags(0), mDownloading(0), mMaxPipelinedRequests(CurlConcurrentConnectionsPerService), mConcurrentConnections(CurlConcurrentConnectionsPerService) { } AIPerService::CapabilityType::~CapabilityType() { } // Fake copy constructor. AIPerService::AIPerService(AIPerService const&) : mHTTPBandwidth(0) { } // url must be of the form // (see http://www.ietf.org/rfc/rfc3986.txt Appendix A for definitions not given here): // // url = sheme ":" hier-part [ "?" query ] [ "#" fragment ] // hier-part = "//" authority path-abempty // authority = [ userinfo "@" ] host [ ":" port ] // path-abempty = *( "/" segment ) // // That is, a hier-part of the form '/ path-absolute', '/ path-rootless' or // '/ path-empty' is NOT allowed here. This should be safe because we only // call this function for curl access, any file access would use APR. // // However, as a special exception, this function allows: // // url = authority path-abempty // // without the 'sheme ":" "//"' parts. // // As follows from the ABNF (see RFC, Appendix A): // - authority is either terminated by a '/' or by the end of the string because // neither userinfo, host nor port may contain a '/'. // - userinfo does not contain a '@', and if it exists, is always terminated by a '@'. // - port does not contain a ':', and if it exists is always prepended by a ':'. // // This function also needs to deal with full paths, in which case it should return // an empty string. // // Full paths can have the form: "/something..." // or "C:\something..." // and maybe even "C:/something..." // // The first form leads to an empty string being returned because the '/' signals the // end of the authority and we'll return immediately. // The second one will abort when hitting the backslash because that is an illegal // character in an url (before the first '/' anyway). // The third will abort because "C:" would be the hostname and a colon in the hostname // is not legal. // //static std::string AIPerService::extract_canonical_servicename(std::string const& url) { char const* p = url.data(); char const* const end = p + url.size(); char const* sheme_colon = NULL; char const* sheme_slash = NULL; char const* first_ampersand = NULL; char const* port_colon = NULL; std::string servicename; char const* hostname = p; // Default in the case there is no "sheme://userinfo@". while (p < end) { int c = *p; if (c == ':') { if (!port_colon && LLStringOps::isDigit(p[1])) { port_colon = p; } else if (!sheme_colon && !sheme_slash && !first_ampersand && !port_colon) { // Found a colon before any slash or ampersand: this has to be the colon between the sheme and the hier-part. sheme_colon = p; } } else if (c == '/') { if (!sheme_slash && sheme_colon && sheme_colon == p - 1 && !first_ampersand && p[1] == '/') { // Found the first '/' in the first occurance of the sequence "://". sheme_slash = p; hostname = ++p + 1; // Point hostname to the start of the authority, the default when there is no "userinfo@" part. servicename.clear(); // Remove the sheme. } else { // Found a slash that is not part of the "sheme://" string. Signals end of authority. // We're done. if (hostname < sheme_colon) { // This happens when windows filenames are passed to this function of the form "C:/..." servicename.clear(); } break; } } else if (c == '@') { if (!first_ampersand) { first_ampersand = p; hostname = p + 1; servicename.clear(); // Remove the "userinfo@" } } else if (c == '\\') { // Found a backslash, which is an illegal character for an URL. This is a windows path... reject it. servicename.clear(); break; } if (p >= hostname) { // Convert hostname to lowercase in a way that we compare two hostnames equal iff libcurl does. #if APR_CHARSET_EBCDIC #error Not implemented #else if (c >= 'A' && c <= 'Z') c += ('a' - 'A'); #endif servicename += c; } ++p; } // Strip off any trailing ":80". if (p - 3 == port_colon && p[-1] == '0' && p[-2] == '8') { return servicename.substr(0, p - hostname - 3); } return servicename; } //static AIPerServicePtr AIPerService::instance(std::string const& servicename) { llassert(!servicename.empty()); instance_map_wat instance_map_w(sInstanceMap); AIPerService::iterator iter = instance_map_w->find(servicename); if (iter == instance_map_w->end()) { iter = instance_map_w->insert(instance_map_type::value_type(servicename, new RefCountedThreadSafePerService)).first; Dout(dc::curlio, "Created new service \"" << servicename << "\" [" << (void*)&*PerService_rat(*iter->second) << "]"); } // Note: the creation of AIPerServicePtr MUST be protected by the lock on sInstanceMap (see release()). return iter->second; } //static void AIPerService::release(AIPerServicePtr& instance) { if (instance->exactly_two_left()) // Being 'instance' and the one in sInstanceMap. { // The viewer can be have left main() we can't access the global sInstanceMap anymore. if (LLApp::isStopped()) { return; } instance_map_wat instance_map_w(sInstanceMap); // It is possible that 'exactly_two_left' is not up to date anymore. // Therefore, recheck the condition now that we have locked sInstanceMap. if (!instance->exactly_two_left()) { // Some other thread added this service in the meantime. return; } #ifdef SHOW_ASSERT { // The reference in the map is the last one; that means there can't be any curl easy requests queued for this service. PerService_rat per_service_r(*instance); for (int i = 0; i < number_of_capability_types; ++i) { llassert(per_service_r->mCapabilityType[i].mQueuedRequests.empty()); } } #endif // Find the service and erase it from the map. iterator const end = instance_map_w->end(); for(iterator iter = instance_map_w->begin(); iter != end; ++iter) { if (instance == iter->second) { instance_map_w->erase(iter); instance.reset(); return; } } // We should always find the service. llassert(false); } instance.reset(); } void AIPerService::redivide_connections(void) { // Priority order. static AICapabilityType order[number_of_capability_types] = { cap_inventory, cap_texture, cap_mesh, cap_other }; // Count the number of capability types that are currently in use and store the types in an array. AICapabilityType used_order[number_of_capability_types]; int number_of_capability_types_in_use = 0; for (int i = 0; i < number_of_capability_types; ++i) { U32 const mask = CT2mask(order[i]); if ((mCTInUse & mask)) { used_order[number_of_capability_types_in_use++] = order[i]; } else { // Give every other type (that is not in use) one connection, so they can be used (at which point they'll get more). mCapabilityType[order[i]].mConcurrentConnections = 1; } } // Keep one connection in reserve for currently unused capability types (that have been used before). int reserve = (mUsedCT != mCTInUse) ? 1 : 0; // Distribute (mConcurrentConnections - reserve) over number_of_capability_types_in_use. U16 max_connections_per_CT = (mConcurrentConnections - reserve) / number_of_capability_types_in_use + 1; // The first count CTs get max_connections_per_CT connections. int count = (mConcurrentConnections - reserve) % number_of_capability_types_in_use; for(int i = 1, j = 0;; --i) { while (j < count) { mCapabilityType[used_order[j++]].mConcurrentConnections = max_connections_per_CT; } if (i == 0) { break; } // Finish the loop till all used CTs are assigned. count = number_of_capability_types_in_use; // Never assign 0 as maximum. if (max_connections_per_CT > 1) { // The remaining CTs get one connection less so that the sum of all assigned connections is mConcurrentConnections - reserve. --max_connections_per_CT; } } } bool AIPerService::throttled(AICapabilityType capability_type) const { return mTotalAdded >= mConcurrentConnections || mCapabilityType[capability_type].mAdded >= mCapabilityType[capability_type].mConcurrentConnections; } void AIPerService::added_to_multi_handle(AICapabilityType capability_type, bool event_poll) { if (event_poll) { llassert(capability_type == cap_other); // We want to mark this service as unused when only long polls have been added, because they // are not counted towards the maximum number of connection for this service and therefore // should not cause another capability type to get less connections. // For example, if - like on opensim - Textures and Other capability types use the same // service then it is nonsense to reserve 4 connections Other and only give 4 connections // to Textures, only because there is a long poll connection (or any number of long poll // connections). What we want is to see: 0-0-0,{0/7,0} for textures when Other is ONLY in // use for the Event Poll. // // This translates to that, since we're adding an event_poll and are about to remove it from // either the command queue OR the request queue, that when mAdded == 1 at the end of this function // (and the rest of the pipeline is empty) we want to mark this capability type as unused. // // If mEventPolls > 0 at this point then mAdded will not be incremented. // If mEventPolls == 0 then mAdded will be incremented and thus should be 0 now. // In other words, if the number of mAdded requests is equal to the number of (counted) // mEventPoll requests right now, then that will still be the case after we added another // event poll request (the transition from used to unused only being necessary because // event poll requests in the pipe line ARE counted; not because that is necessary but // because it would be more complex to not do so). // // Moreover, when we get here then the request that is being added is still counted as being in // the command queue, or the request queue, so that pipelined_requests() will return 1 more than // the actual count. U16 counted_event_polls = (mEventPolls == 0) ? 0 : 1; if (mCapabilityType[capability_type].mAdded == counted_event_polls && mCapabilityType[capability_type].pipelined_requests() == counted_event_polls + 1) { mark_unused(capability_type); } if (++mEventPolls > 1) { // This only happens on megaregions. Do not count the additional long poll connections against the maximum handles for this service. return; } } ++mCapabilityType[capability_type].mAdded; ++mTotalAdded; } void AIPerService::removed_from_multi_handle(AICapabilityType capability_type, bool event_poll, bool downloaded_something, bool success) { CapabilityType& ct(mCapabilityType[capability_type]); llassert(mTotalAdded > 0 && ct.mAdded > 0 && (!event_poll || mEventPolls)); if (!event_poll || --mEventPolls == 0) { --ct.mAdded; --mTotalAdded; } if (downloaded_something) { llassert(ct.mDownloading > 0); --ct.mDownloading; } // If the number of added request handles is equal to the number of counted event poll handles, // in other words, when there are only long poll connections left, then mark the capability type // as unused. U16 counted_event_polls = (capability_type != cap_other || mEventPolls == 0) ? 0 : 1; if (ct.mAdded == counted_event_polls && ct.pipelined_requests() == counted_event_polls) { mark_unused(capability_type); } if (success) { ct.mFlags |= ctf_success; } } // Returns true if the request was queued. bool AIPerService::queue(AICurlEasyRequest const& easy_request, AICapabilityType capability_type, bool force_queuing) { CapabilityType::queued_request_type& queued_requests(mCapabilityType[capability_type].mQueuedRequests); bool needs_queuing = force_queuing || !queued_requests.empty(); if (needs_queuing) { queued_requests.push_back(easy_request.get_ptr()); if (is_approved(capability_type)) { TotalQueued_wat(sTotalQueued)->approved++; } } return needs_queuing; } bool AIPerService::cancel(AICurlEasyRequest const& easy_request, AICapabilityType capability_type) { CapabilityType::queued_request_type::iterator const end = mCapabilityType[capability_type].mQueuedRequests.end(); CapabilityType::queued_request_type::iterator cur = std::find(mCapabilityType[capability_type].mQueuedRequests.begin(), end, easy_request.get_ptr()); if (cur == end) return false; // Not found. // We can't use erase because that uses assignment to move elements, // because it isn't thread-safe. Therefore, move the element that we found to // the back with swap (could just swap with the end immediately, but I don't // want to break the order in which requests where added). Swap is also not // thread-safe, but OK here because it only touches the objects in the deque, // and the deque is protected by the lock on the AIPerService object. CapabilityType::queued_request_type::iterator prev = cur; while (++cur != end) { prev->swap(*cur); // This is safe, prev = cur; } mCapabilityType[capability_type].mQueuedRequests.pop_back(); // if this is safe. if (is_approved(capability_type)) { TotalQueued_wat total_queued_w(sTotalQueued); llassert(total_queued_w->approved > 0); total_queued_w->approved--; } return true; } void AIPerService::add_queued_to(curlthread::MultiHandle* multi_handle, bool only_this_service) { U32 success = 0; // The CTs that we successfully added a request for from the queue. bool success_this_pass = false; int i = 0; // The first pass we only look at CTs with 0 requests added to the multi handle. Subsequent passes only non-zero ones. for (int pass = 0;; ++i) { if (i == number_of_capability_types) { i = 0; // Keep trying until we couldn't add anything anymore. if (pass++ && !success_this_pass) { // Done. break; } success_this_pass = false; } CapabilityType& ct(mCapabilityType[i]); if (!pass != !ct.mAdded) // Does mAdded match what we're looking for (first mAdded == 0, then mAdded != 0)? { continue; } if (multi_handle->added_maximum()) { // We hit the maximum number of global connections. Abort every attempt to add anything. only_this_service = true; break; } if (mTotalAdded >= mConcurrentConnections) { // We hit the maximum number of connections for this service. Abort any attempt to add anything to this service. break; } if (ct.mAdded >= ct.mConcurrentConnections) { // We hit the maximum number of connections for this capability type. Try the next one. continue; } U32 mask = CT2mask((AICapabilityType)i); if (ct.mQueuedRequests.empty()) // Is there anything in the queue (left) at all? { // We could add a new request, but there is none in the queue! // Note that if this service does not serve this capability type, // then obviously this queue was empty; however, in that case // this variable will never be looked at, so it's ok to set it. ct.mFlags |= ((success & mask) ? ctf_empty : ctf_starvation); } else { // Attempt to add the front of the queue. if (!multi_handle->add_easy_request(ct.mQueuedRequests.front(), true)) { // If that failed then we got throttled on bandwidth because the maximum number of connections were not reached yet. // Therefore this will keep failing for this service, we abort any additional attempt to add something for this service. break; } // Request was added, remove it from the queue. // Note: AIPerService::added_to_multi_handle (called from add_easy_request above) relies on the fact that // we first add the easy handle and then remove it from the request queue (which is necessary to avoid // that another thread adds one just in between). ct.mQueuedRequests.pop_front(); // Mark that at least one request of this CT was successfully added. success |= mask; success_this_pass = true; // Update approved count. if (is_approved((AICapabilityType)i)) { TotalQueued_wat total_queued_w(sTotalQueued); llassert(total_queued_w->approved > 0); total_queued_w->approved--; } } } size_t queuedapproved_size = 0; for (int i = 0; i < number_of_capability_types; ++i) { CapabilityType& ct(mCapabilityType[i]); U32 mask = CT2mask((AICapabilityType)i); // Add up the size of all queues with approved requests. if ((approved_mask & mask)) { queuedapproved_size += ct.mQueuedRequests.size(); } // Skip CTs that we didn't add anything for. if (!(success & mask)) { continue; } if (!ct.mQueuedRequests.empty()) { // We obtained one or more requests from the queue, and even after that there was at least one more request in the queue of this CT. ct.mFlags |= ctf_full; } } // Update the flags of sTotalQueued. { TotalQueued_wat total_queued_w(sTotalQueued); if (total_queued_w->approved == 0) { if ((success & approved_mask)) { // We obtained an approved request from the queue, and after that there were no more requests in any (approved) queue. total_queued_w->empty = true; } else { // Every queue of every approved CT is empty! total_queued_w->starvation = true; } } else if ((success & approved_mask)) { // We obtained an approved request from the queue, and even after that there was at least one more request in some (approved) queue. total_queued_w->full = true; } } // Don't try other services if anything was added successfully. if (success || only_this_service) { return; } // Nothing from this service could be added, try other services. instance_map_wat instance_map_w(sInstanceMap); for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) { PerService_wat per_service_w(*service->second); if (&*per_service_w == this) { continue; } per_service_w->add_queued_to(multi_handle, true); } } //static void AIPerService::purge(void) { instance_map_wat instance_map_w(sInstanceMap); for (iterator service = instance_map_w->begin(); service != instance_map_w->end(); ++service) { Dout(dc::curl, "Purging queues of service \"" << service->first << "\"."); PerService_wat per_service_w(*service->second); TotalQueued_wat total_queued_w(sTotalQueued); for (int i = 0; i < number_of_capability_types; ++i) { size_t s = per_service_w->mCapabilityType[i].mQueuedRequests.size(); per_service_w->mCapabilityType[i].mQueuedRequests.clear(); if (is_approved((AICapabilityType)i)) { llassert(total_queued_w->approved >= (S32)s); total_queued_w->approved -= s; } } } } //static void AIPerService::adjust_concurrent_connections(int increment) { instance_map_wat instance_map_w(sInstanceMap); for (AIPerService::iterator iter = instance_map_w->begin(); iter != instance_map_w->end(); ++iter) { PerService_wat per_service_w(*iter->second); U16 old_concurrent_connections = per_service_w->mConcurrentConnections; int new_concurrent_connections = llclamp(old_concurrent_connections + increment, 1, (int)CurlConcurrentConnectionsPerService); per_service_w->mConcurrentConnections = (U16)new_concurrent_connections; increment = per_service_w->mConcurrentConnections - old_concurrent_connections; for (int i = 0; i < number_of_capability_types; ++i) { per_service_w->mCapabilityType[i].mMaxPipelinedRequests = llmax(per_service_w->mCapabilityType[i].mMaxPipelinedRequests + increment, 0); int new_concurrent_connections_per_capability_type = llclamp((new_concurrent_connections * per_service_w->mCapabilityType[i].mConcurrentConnections + old_concurrent_connections / 2) / old_concurrent_connections, 1, new_concurrent_connections); per_service_w->mCapabilityType[i].mConcurrentConnections = (U16)new_concurrent_connections_per_capability_type; } } } void AIPerService::ResetUsed::operator()(AIPerService::instance_map_type::value_type const& service) const { PerService_wat(*service.second)->resetUsedCt(); } void AIPerService::Approvement::honored(void) { if (!mHonored) { mHonored = true; PerService_wat per_service_w(*mPerServicePtr); llassert(per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests > 0 && per_service_w->mApprovedRequests > 0); per_service_w->mCapabilityType[mCapabilityType].mApprovedRequests--; per_service_w->mApprovedRequests--; } } void AIPerService::Approvement::not_honored(void) { honored(); LL_WARNS() << "Approvement for has not been honored." << LL_ENDL; }