diff -r 000000000000 -r 676905a3b03c dejsem.1.5/python/dejsem.pycharm/node.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dejsem.1.5/python/dejsem.pycharm/node.py Wed Nov 27 09:50:16 2019 +0100 @@ -0,0 +1,445 @@ +# coding=utf8 + +import sys, os, ssl, time, socket, errno, signal +from Crypto.Cipher import DES3 +from d import D +from parms import Parms + + +class Node(): + + class AllPortsBusy(Exception): + """všechny TCP porty pro server longtasks nebo pro peering jsou obsazeny""" + + blocking = True # select mode zatím není implementovaný + useSSLContext = False + ctx = None + UDPbroadcastGO = False + + def __init__(self, d, chan=Parms.sslchannel, host=Parms.srvhost, port=None, conn=True, tryPort=True, peering=False): + self._issl = Parms.ssl + self._chan = chan + self._bindhost = host + self._baseport = Parms.baseport + (self._chan * 10) + (0 if self._issl else 1) + self._minport = self._baseport + 1 + self._maxport = self._baseport + 9 + self._baseid = "netnode {}SSL".format("" if self._issl else "non") + self.d = D("{} {}".format(d.debid, self._baseid)) + self._srv_side = None + self._UDPpasswd = "heslo" + self._UDPbroadcast_addr = Parms.broadcast + self._UDPbroadcast_port = Parms.udpport + self._UDP_key = "PEER_IP" + self._UDPbroadcastGO = False + self.sslContext() + if conn: # TCP connect + if peering: + host, port = self.get_peerport() + self.conn(host=host, port=port) + else: # socket bind + if tryPort: # hledej volný port + self.bindtrynext(self._bindhost) + if peering: self.send_peerport() + else: # zkus bind a případně čekej na uvolnění + self.bindwait(self._bindhost) + + + def get(self, size): + try: + if self.d.ll(5): self.d.log("get data from scfile...") + data = self._scfile.read(size) + if self.d.ll(5): self.d.log("{} bytes read".format(len(data))) + return data + except Exception as e: + self.d.abend("read from socket", e) + return -1 + + + def genget(self, size=-1): + rest = size + while rest != 0: + n = rest if 0 < rest < Parms.bufSize else Parms.bufSize + data = self.get(n) + r = len(data) + if r < 1: break + rest = rest - r + yield data + + def getnum(self): + b = self._scfile.read(12).decode() + num = int(b) if b else -2 # -2 = EOD, -1 = directory, 0 and higher = data size + if self.d.ll(5): self.d.log("getnum, got {:012d} (-2 means EOD)".format(num)) + return num + + def getstr(self, decode = True): + lb = self.getnum() + if lb < 1: + return "" + else: + _data = self._scfile.read(int(lb)) + return _data.decode() if decode else _data + + def getfn(self): + return self.getstr() + + def getcmd(self): + try: + return self._scfile.read(8).decode().rstrip('_') + except Exception as e: + if isinstance(e, socket.timeout): + if self.d.ll(4): self.d.log("getcmd timeout") + else: + self.d.log("I/O err: {}".format(e)) + return "" + + def receive_dir(self, fp, size, timestamp): + os.makedirs(fp, exist_ok=True) + os.utime(fp, (timestamp, timestamp)) + return True + + def receive_file(self, fp, size, timestamp, counter=None): + if os.path.dirname(fp): os.makedirs(os.path.dirname(fp), exist_ok=True) + tempfp = fp + ".dejsem.partX" + with open(tempfp, mode='w+b') as f: + for data in self.genget(size = size): + if counter: counter.update(len(data)) + f.write(data) + if os.path.getsize(tempfp) == size: + os.rename(tempfp, fp) + os.utime(fp, (timestamp, timestamp)) + return True + + def receive_stream(self, fp, size, counter=None): + if os.path.dirname(fp): os.makedirs(os.path.dirname(fp), exist_ok=True) + tempfp = fp + ".{}.partX".format(Parms.applName) + with open(tempfp, mode='w+b') as f: + for data in self.genget(size = size): + if counter: counter.update(len(data)) + f.write(data) + if os.path.getsize(tempfp) == size: + os.rename(tempfp, fp) + return True + + def put(self, data): + if self.d.ll(5): self.d.log("PUT: data len={}, sending...".format(len(data))) + try: + l = self._scfile.write(data) + if self.d.ll(5): self.d.log("PUT: data len={}, sent".format(l)) + self._scfile.flush() + except Exception as e: + self.d.abend("send err", e) + return False + return True + + def genput(self): + try: + while True: + data = yield None + self.put(data) + except Exception as e: + self.d.abend("write to socket", e) + raise e + finally: + self._scfile.flush() + + def sendEOD(self): + self.putnum(0) + + def putnum(self, n): + if self.d.ll(5): self.d.log("putnum, num={:012d}".format(n)) + self._scfile.write(bytes("{:012d}".format(n), "utf8")) + self._scfile.flush() + + def putstr(self, fn): + b = bytes(str(fn), "utf8") + self.putnum(len(b)) + if self.d.ll(5): self.d.log("putstr, string={}".format(fn)) + self._scfile.write(b) + self._scfile.flush() + + def putcmd(self, act): + if self.d.ll(3): self.d.log("action: " + act) + # self._node.payload.data = bytes("{}".format(act), "utf8") + self.put(bytes("{}".format(act.ljust(8, '_')), "utf8")) + + def sendport(self, port): + """send dynamically allocated port to client""" + self.putnum(port) + + def putfileinfo(self, fp, relfp): + if self.d.ll(5): self.d.log("putfileinfo fp={}, relfp={}...".format(fp, relfp)) + self.putstr(relfp) + size = os.path.getsize(fp) if os.path.isfile(fp) else -1 + self.putnum(size) + timestamp = int(os.path.getmtime(fp)) if os.path.exists(fp) else 0 + self.putnum(timestamp) + if self.d.ll(4): self.d.log("fileinfo sent: fn={}, size={}, timestamp={}".format(relfp, size, timestamp)) + + def digest(self): + return self.data if len(self.data) < 24 else self.data[0:8].decode() + "--------" + self.data[-8:].decode() + + def sslContext(self): + if self._issl: + if Node.useSSLContext: + if not Node.ctx: + if self.d.ll(4): self.d.log( + "setting SSL context: certfile={}, capath={}...".format(Parms.sslCert, Parms.sslCAPath)) + try: + Node.ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) # PROTOCOL_SSLv23 + Node.ctx.verify_mode = ssl.CERT_REQUIRED # CERT_REQUIRED | CERT_OPTIONAL | CERT_NONE + Node.ctx.load_cert_chain(Parms.sslCert) + Node.ctx.load_verify_locations(None, Parms.sslCAPath) + except ssl.SSLError as e: + self.d.abendHard("SSL context", e) + + def getssc(self): + try: + ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except Exception as e: + self.d.abendHard("ssc alloc", e) + if self._issl: + try: + if Node.useSSLContext: + ssc = Node.ctx.wrap_socket(ssc, server_side=True) + else: + ssc = ssl.wrap_socket( + ssc, + certfile=Parms.sslCert, + ca_certs=Parms.sslCAPath, + server_side=True, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=ssl.PROTOCOL_TLSv1) + except ssl.SSLError as e: + self.d.abendHard("ssc SSL wrap", e) + return ssc + + def bindwait(self, host): + port = self._baseport + self.d.log("binding to {}:{}".format(host, port)) + ssc = self.getssc() + tries = 0 + while True: + try: + ssc.bind((host, port)) + break + except Exception as e: + if e.strerror == "Address already in use": + if not tries: self.d.log("Address {}:{} already in use, waiting 10 secs...".format(host, port)) + tries = tries + 1 if tries < 77 else 0 + try: + time.sleep(10) + except KeyboardInterrupt: + raise + continue + self.d.abendHard("bind", e) + except KeyboardInterrupt: + raise + ssc.listen(1) + if self.d.ll(2): self.d.log("bound to {}:{}".format(host, port)) + self._ssc = ssc + self.port = port + return ssc + + def bindtrynext(self, host): + for port in range(self._minport, self._maxport + 1): + if self.d.ll(4): self.d.log("trying to bind to {}:{}...".format(host, port)) + try: + ssc = self.getssc() + ssc.bind((host, port)) + ssc.listen(1) + break + except Exception as e: + if e.strerror == "Address already in use": + if port < self._maxport: + continue + raise Node.AllPortsBusy + self.d.abend("bind", e) + if self.d.ll(2): self.d.log("bound to {}:{}".format(host, port)) + self._ssc = ssc + self.port = port + return (ssc, port) + + def send_peerport(self): + """UDP broadcast host:port pair for peer""" + ipport = "{:012d}{}{:012d}{}{:012d}".format(len(self._UDP_key), self._UDP_key, len(self._bindhost), self._bindhost, self.port) + + c = DES3.new(self.rawKey(self._UDPpasswd, 24), DES3.MODE_ECB) + data = ipport.encode() + enc = c.encrypt(data + b' ' * (8 - len(data) % 8)) + if self.d.ll(4): self.d.log("len=%d, enc=[%s]" % (len(enc), enc.hex())) + s = socket.socket(type=socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + a = (self._UDPbroadcast_addr, self._UDPbroadcast_port) + if self.d.ll(4): self.d.log("start udp sending to {}:{}: {}".format(self._UDPbroadcast_addr, Parms.udpport, data.decode())) + + pid = os.fork() + if pid: self._UDPbroadcastPID = pid + else: + self.UDPbroadcastGO = True + signal.signal(signal.SIGHUP, self.UDPstop) + signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGHUP}) + retries = 777 + while retries > 0 and self.UDPbroadcastGO: + s.sendto(enc, a) + time.sleep(1) + retries -= 1 + sys.exit(0) + + def get_peerport(self): + """get peer host:port pair broadcasted by peer via UDP""" + c = DES3.new(self.rawKey(self._UDPpasswd, 24), DES3.MODE_ECB) + s = socket.socket(type=socket.SOCK_DGRAM) + a = ('', self._UDPbroadcast_port) + if self.d.ll(4): self.d.log("binding to udp-port {}:{}".format(a[0], a[1])) + s.bind(a) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + while True: + (dataBytes, (ip, port)) = s.recvfrom(512) + try: # ignore bad dgrams + data = c.decrypt(dataBytes).decode().strip() if Parms.ssl else dataBytes.decode() + except: continue + if self.d.ll(5): self.d.log("datalen={}, data={}".format(len(data), data)) + strlen = int(data[:12]) + key = data[12:12+strlen] + if not key == self._UDP_key: continue + data = data[12+strlen:] + strlen = int(data[:12]) + host = data[12:12+strlen] + port = int(data[12+strlen:]) + if self.d.ll(4): self.d.log("peer listening on {}:{}".format(host, port)) + return (host, port) + + def rawKey(self, passwd, keylen): + key = b'' + while len(key) < keylen: + key = key + passwd.encode() + return key[:keylen] + + def UDPstop(self, sign, frame): + self.UDPbroadcastGO = False + + def UDPsignalHUP(self): + os.kill(self._UDPbroadcastPID, signal.SIGHUP) # stop UDP broadcast + + def acc(self, acc_TO=Parms.peer_accept_timeout): + if self.d.ll(4): self.d.log("accepting on {} ...".format(self.port)) + self._ssc.settimeout(acc_TO) + try: + self._sc, (froma, fromp) = self._ssc.accept() + except KeyboardInterrupt: + if self.d.ll(4): self.d.log("KeyboardInterrupt") + raise + except Exception as e: + self.d.abend("accept", e) + return False + # fileno = self._sc.fileno() + if self.d.ll(2): self.d.log("conn request on {}SSL port {} from {}:{}" + .format("" if self._issl else "non", self.port, froma, fromp)) + if Node.blocking: + self._sc.settimeout(Parms.blockTimeout) + else: # select mode není zatím implementovaný + self._srv_side[self._sc] = self._sc + if self.d.ll(3): self.d.log("srv side={}".format(*(sc.fileno() for sc in self._srv_side.values()))) + accepted = False + commonName = "nonSSL" + certSubject = {} + if self._chan > 0 and self._issl: + certSubject.update(i for (i,) in self._sc.getpeercert()['subject']) + self.d.log("client certificate subject:", certSubject, sev=4) + if "commonName" in certSubject: commonName = certSubject["commonName"] + if commonName == "{:02d}".format(self._chan): accepted = True + # alternativa + # for ((key, value),) in sc.getpeercert().get("subject"): + # if key == "commonName": + # commonName = value + # if commonName == "{:02d}".format(self._chan): accepted = True + else: + accepted = True + if self.d.ll(2): self.d.log("client {} {}".format(certSubject["commonName"], "accepted" if accepted else "rejected")) + try: + self._scfile = self._sc.makefile("rwb") + except Exception as e: + self.d.abendMsg("socket-makefile", e=e) + self.close_sc() + return False + if accepted: + try: + if self.d.ll(4): self.d.log("confirming accept") + self._scfile.write(b"ACCEPTED") + self._scfile.flush() + return True + except Exception as e: + self.d.abendMsg("send confirm", e=e) + self.close_sc() + return False + else: + self._scfile.write(b"REJECTED") + self.close_sc() + return False + + def conn(self, host=Parms.srvhost, port=None): + if not port: port = self._baseport + if self.d.ll(4): self.d.log("connecting to {}:{}...".format(host, port)) + try: + self._sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except Exception as e: + self.d.abend("socket alloc", e) + if self._issl: + if self.d.ll(4): self.d.log("sc SSL wrap, homedir={}, certfile={}, ca_certs={}" + .format(os.getcwd(), Parms.sslCert, Parms.sslCAPath)) + try: + if Node.useSSLContext: + self._sc = Node.ctx.wrap_socket(self._sc) + else: + self._sc = ssl.wrap_socket( + self._sc, + certfile=Parms.sslCert, + ca_certs=Parms.sslCAPath, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=ssl.PROTOCOL_TLSv1) + except Exception as e: + self.d.abend("sc SSL wrap", e) + retry = Parms.connThreshold + connected = False + while not connected and retry > 0: + try: + self._sc.connect((host, port)) + connected = True + except Exception as e: + if e.errno == errno.ECONNREFUSED: + retry = retry - 1 + time.sleep(Parms.connTimeout) + else: + self.d.abend("connect to {}".format(host), e) + if retry == 0: self.d.abend("connection to {} refused, threshold {} reached".format(host, Parms.connThreshold), None) + fileno = self._sc.fileno() + if Node.blocking: self._sc.settimeout(Parms.blockTimeout) + try: + self._scfile = self._sc.makefile("rwb") + except Exception as e: + self.d.abend("connect makefile", e) + try: + if self._scfile.read(8) != b"ACCEPTED": self.d.abend("connection not accepted by server", None) + except Exception as e: + self.d.abend("read socket", e) + if self.d.ll(2): self.d.log("connected to {}:{} after {} retries, via fd {}" + .format(host, port, Parms.connThreshold - retry, fileno)) + + def close_sc(self): + if self.d.ll(4): self.d.log("closing socket...") + try: + if hasattr(self, '_scfile'): self._scfile.close() + if hasattr(self, '_sc'): self._sc.close() + except Exception as e: + self.d.abend("closing socket", e) + + def close_ssc(self): + if self.d.ll(4): self.d.log("closing SSL socket...") + if hasattr(self, "_ssc"): + try: self._ssc.close() + except Exception as e: self.d.abend("closing SSL socket", e) + + def close(self): + self.close_sc() + self.close_ssc()