Report delayed errors (#66)
This commit is contained in:
90
server.py
90
server.py
@@ -190,12 +190,28 @@ def announce():
|
||||
server["total_clients"] = server["clients"]
|
||||
server["pop_v"] = server["total_clients"] / server["updates"]
|
||||
|
||||
old_err = errorTracker.get(getErrorPK(server))
|
||||
|
||||
finishRequestAsync(server)
|
||||
|
||||
if old_err:
|
||||
return ("Request has been filed, "
|
||||
"but the previous request encountered the following error:\n" +
|
||||
old_err, 409)
|
||||
return "Request has been filed.", 202
|
||||
|
||||
# Utilities
|
||||
|
||||
# returns a primary key suitable for saving and replaying an error unique to a
|
||||
# server that was announced.
|
||||
def getErrorPK(server):
|
||||
# some failures depend on the client IP too
|
||||
return(server["ip"], server["address"], server["port"])
|
||||
|
||||
# check if something is a domain name (approximate)
|
||||
def isDomain(s):
|
||||
return "." in s and s.rpartition(".")[2][0].isalpha()
|
||||
|
||||
# Returns ping time in seconds (up), False (down), or None (error).
|
||||
def serverUp(info):
|
||||
sock = None
|
||||
@@ -211,7 +227,7 @@ def serverUp(info):
|
||||
# [7] u8 type (PACKET_TYPE_ORIGINAL)
|
||||
buf = b"\x4f\x45\x74\x03\x00\x00\x00\x01"
|
||||
sock.send(buf)
|
||||
start = time.time()
|
||||
start = time.monotonic()
|
||||
# receive reliable packet of type CONTROL, subtype SET_PEER_ID,
|
||||
# with our assigned peer id as data
|
||||
# [0] u32 protocol_id (PROTOCOL_ID)
|
||||
@@ -223,7 +239,7 @@ def serverUp(info):
|
||||
# [11] u8 controltype (CONTROLTYPE_SET_PEER_ID)
|
||||
# [12] session_t peer_id_new
|
||||
data = sock.recv(1024)
|
||||
end = time.time()
|
||||
end = time.monotonic()
|
||||
if not data:
|
||||
return False
|
||||
peer_id = data[12:14]
|
||||
@@ -248,10 +264,6 @@ def serverUp(info):
|
||||
|
||||
|
||||
def checkRequestAddress(server):
|
||||
# will fall back to IP of requester, can't possibly be wrong
|
||||
if "address" not in server or not server["address"]:
|
||||
return
|
||||
|
||||
name = server["address"].lower()
|
||||
|
||||
# example value from minetest.conf
|
||||
@@ -393,6 +405,10 @@ def checkRequest(server):
|
||||
s = s.replace(c, "")
|
||||
server[field] = s
|
||||
|
||||
# default value
|
||||
if "address" not in server or not server["address"]:
|
||||
server["address"] = server["ip"]
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -404,11 +420,7 @@ def finishRequestAsync(server):
|
||||
|
||||
|
||||
def asyncFinishThread(server):
|
||||
checkAddress = False
|
||||
if not "address" in server or not server["address"]:
|
||||
server["address"] = server["ip"]
|
||||
else:
|
||||
checkAddress = True
|
||||
checkAddress = server["ip"] != server["address"]
|
||||
|
||||
try:
|
||||
info = socket.getaddrinfo(server["address"],
|
||||
@@ -416,14 +428,19 @@ def asyncFinishThread(server):
|
||||
type=socket.SOCK_DGRAM,
|
||||
proto=socket.SOL_UDP)
|
||||
except socket.gaierror:
|
||||
app.logger.warning("Unable to get address info for %s." % (server["address"],))
|
||||
err = "Unable to get address info for %s" % server["address"]
|
||||
app.logger.warning(err)
|
||||
errorTracker.put(getErrorPK(server), err)
|
||||
return
|
||||
|
||||
if checkAddress:
|
||||
addresses = set(data[4][0] for data in info)
|
||||
if not server["ip"] in addresses:
|
||||
app.logger.warning("Invalid IP %s for address %s (address valid for %s)."
|
||||
% (server["ip"], server["address"], addresses))
|
||||
err = "Requester IP %s does not match host %s" % (server["ip"], server["address"])
|
||||
if isDomain(server["address"]):
|
||||
err += " (valid: %s)" % " ".join(addresses)
|
||||
app.logger.warning(err)
|
||||
errorTracker.put(getErrorPK(server), err)
|
||||
return
|
||||
|
||||
geo = geoip_lookup_continent(info[-1][4][0])
|
||||
@@ -432,12 +449,16 @@ def asyncFinishThread(server):
|
||||
|
||||
server["ping"] = serverUp(info[0])
|
||||
if not server["ping"]:
|
||||
app.logger.warning("Server %s:%d has no ping."
|
||||
% (server["address"], server["port"]))
|
||||
err = "Server %s port %d did not respond to ping" % (server["address"], server["port"])
|
||||
if isDomain(server["address"]):
|
||||
err += " (tried %s)" % info[0][4][0]
|
||||
app.logger.warning(err)
|
||||
errorTracker.put(getErrorPK(server), err)
|
||||
return
|
||||
|
||||
# success!
|
||||
errorTracker.remove(getErrorPK(server))
|
||||
del server["action"]
|
||||
|
||||
serverList.update(server)
|
||||
|
||||
|
||||
@@ -561,19 +582,50 @@ class ServerList:
|
||||
self.save()
|
||||
|
||||
|
||||
class ErrorTracker:
|
||||
VALIDITY_TIME = 600
|
||||
|
||||
def __init__(self):
|
||||
self.table = {}
|
||||
self.lock = RLock()
|
||||
|
||||
def put(self, k, info):
|
||||
with self.lock:
|
||||
self.table[k] = (time.monotonic() + ErrorTracker.VALIDITY_TIME, info)
|
||||
|
||||
def remove(self, k):
|
||||
with self.lock:
|
||||
self.table.pop(k, None)
|
||||
|
||||
def get(self, k):
|
||||
with self.lock:
|
||||
e = self.table.get(k)
|
||||
if e and e[0] >= time.monotonic():
|
||||
return e[1]
|
||||
|
||||
def cleanup(self):
|
||||
with self.lock:
|
||||
now = time.monotonic()
|
||||
table = {k: e for k, e in self.table.items() if e[0] >= now}
|
||||
self.table = table
|
||||
|
||||
|
||||
class PurgeThread(Thread):
|
||||
def __init__(self):
|
||||
Thread.__init__(self)
|
||||
self.daemon = True
|
||||
Thread.__init__(self, daemon=True)
|
||||
def run(self):
|
||||
while True:
|
||||
time.sleep(60)
|
||||
serverList.purgeOld()
|
||||
errorTracker.cleanup()
|
||||
|
||||
|
||||
# Globals / Startup
|
||||
|
||||
serverList = ServerList()
|
||||
|
||||
errorTracker = ErrorTracker()
|
||||
|
||||
PurgeThread().start()
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user