diff -r 000000000000 -r 676905a3b03c dejsem.1.5/python/dejsem.pycharm/client.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dejsem.1.5/python/dejsem.pycharm/client.py Wed Nov 27 09:50:16 2019 +0100 @@ -0,0 +1,398 @@ +# coding=utf8 + +import socket, os, sys, time, signal, subprocess +from d import D +from parms import Parms +from node import Node +from counter import Counter + +# síťové operace klienta +# - connect to remote server na base_port+CCx +# - connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER +# - listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER +# - formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji + +class Client(): + + def __init__(self, d): + + self.d = D("client".format(d.debid)) + + Parms.clientMode = True + + self._orig = Parms.orig # origin data paths + self._dest = Parms.dest # destination data path + + self._chan = Parms.sslchannel + + if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}" + .format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel)) + if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath)) + + act = Parms.action + if not act: + return + elif act == "PUSHCLIP": # from cliboard to remote server + self.pushclip("clipboard") + elif act == "PUSHPRIM": # from X primary to remote server + self.pushclip("primary") + elif act == "PUSHFILE": # from local files to remote server + self.pushfile() + elif act == "PUSHPEER": # from local files to remote peer + self.pushpeer() + elif act == 'PULLCLIP': # from remote server to cliboard + self.pullclip() + elif act == 'PULLHIST': # synchronize local clipboard history from server + self.pullhist() + elif act == 'PULLFILE': # from remote server files to local + self.pullfile() + elif act == 'PULLLIST': # filelist of server dir + self.pulllist() + elif act == "PULLPEER": # from remote peer to local + self.pullpeer() + elif act == "SRVHACK": + self.srv_hack() + elif act == "HACK": + self.hack() + else: + self.d.log("ABEND, unknown command {}".format(act)) + if hasattr(self, "_node"): + self._node.close_sc() + self._node.close_ssc() + + def pushclip(self, buffer): + p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE) + if not p.stdout.read().lstrip(): # nejdřív ověřit, že na clipboardu něco visí + self.d.abend("{} buf is empty".format(buffer), None) + self._node = Node(self.d, conn=True) # establish server cmd session + self._node.putcmd("PUSHCLIP") + if self.d.ll(3): self.d.log("push clipboard starting...") + p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE) + self._node.put(p.stdout.read()) + p.wait() + if self.d.ll(2): self.d.log("push clipboard end") + + def longtask(self): + # příkaz serveru k otevření paralelního portu pro dlouhý přenos + # dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port + self._node.putcmd("LONGTASK") + port = self._node.getnum() + if port == 0: + self.d.abend("all server net ports are busy", None) + self._node.close() # free server base port 0 + self._node = Node(self.d, port=port) + + def pushfile(self): + if not self._orig: + self.d.abendMsg("nothing specified") + else: + if not self._dest and len(self._orig) > 1: # je-li na vstupu více jmen, tak poslední je destdir + self._dest = os.path.normpath(self._orig[-1]) + if self._dest.startswith('/'): self._dest = self._dest[1:] # výstup jenom relativně + del self._orig[-1] + if not any(os.path.exists(p) for p in self._orig): + self.d.abendMsg("{} not found".format(self._orig)) + else: + self._node = Node(self.d, conn=True) + self.longtask() # přechod na datový port + self._counter = Counter(self.d, self.batchSize()) # start transfer progress display + self._node.putcmd("PUSHFILE") + for fp in self._orig: + if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest)) + if os.path.exists(fp): + normfp = os.path.normpath(fp) + self.pushfile_recurse(normfp, os.path.dirname(normfp)) + else: + self.d.warn("pushfile: {} not found".format(fp)) + self._counter.stop() + self.d.log("pushfile end", sev=2) + + def pushfile_recurse(self, fp, prefix): + if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix))) + dest = os.path.join(self._dest, os.path.relpath(fp, prefix)) + if os.path.isdir(fp): + for cwd, void, entries in os.walk(fp, topdown=True): + for entry in entries: + if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry)) + self.pushfile_recurse(os.path.join(cwd, entry), prefix) + self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix))) + self._node.putfileinfo(fp, dest) + else: + self._node.putfileinfo(fp, dest) + with open(fp, mode='rb') as f: + g = self._node.genput() + g.send(None) + data = f.read(Parms.bufSize) + while data: + self._counter.update(len(data)) + if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data))) + try: + g.send(data) + data = f.read(Parms.bufSize) + except Exception: + break + g.close() + + def pullclip(self): + self._node = Node(self.d, conn=True) # establish server cmd session + self._node.putcmd("PULLCLIP") + self._node.putstr("") # empty str means LAST entry in clipboard storage on server + if self.d.ll(3): self.d.log("pull clipboard starting...") + p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE) + for data in self._node.genget(size = self._node.getnum()): + p.stdin.write(data) + p.stdin.close() + p.wait() + self._node.close_sc() + if self.d.ll(2): self.d.log("pull clipboard end") + + def pullhist(self): + """ + synchronizace historie clipboardu se serverem + - entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir + - po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů + """ + histdir = Parms.client_histdir + self._node = Node(self.d, conn=True) # establish server cmd session + self._node.putcmd("PULLHIST") + self.d.log("synchronizing clipboard history...") + os.makedirs(histdir, mode=0o755, exist_ok=True) + os.chdir(histdir) + entries = dict() + toget = dict() + fn = self._node.getfn() + while len(fn) > 0: # inventarizace serveru + if self.d.ll(5): self.d.log("fn=" + fn) + # entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp + # entries mohou být binární i textové, takže se nedekódují + entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum()) + if not os.path.exists(fn): + toget[fn] = entries[fn] + if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget))) + fn = self._node.getfn() + self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4) + if len(toget): # synchronizace + for fn in toget.keys(): + self._node.putcmd("PULLCLIP") + self._node.putstr(fn) # send requested entry name + with open(fn, mode='wb') as f: + for data in self._node.genget(size = self._node.getnum()): + f.write(data) + timestamp = toget[fn][2] + os.utime(fn, (timestamp, timestamp)) + self._node.close_sc() + p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True) + for fn in os.listdir(): # indexing + if fn == ".index": continue + digest = open(fn, mode="rb").read(80).replace(b'\n', b' ') + try: digest = digest.decode() # to, co nepůjde dekódovat, necháme být + except UnicodeDecodeError: pass + size = os.path.getsize(fn) + timestamp = int(os.path.getmtime(fn)) + p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest)) + p.stdin.close() + p.wait() + if p.returncode == 0: + p = subprocess.Popen(["mc", histdir]) + p.wait() + if self.d.ll(5): self.d.log("clipboard history sync finished") + + + def pullfile(self): + ldp = len(Parms.datapaths) + if ldp: + if ldp > 1: # alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky + self._orig = Parms.datapaths[:ldp-1] + self._dest = Parms.datapaths[-1] + else: # jedinný arg je požadavek, destination dir podle env DEST + self._orig = Parms.datapaths[:1] + self._dest = Parms.dest + else: # bez argumentů: požadavek i destinace podle env + self._orig = Parms.orig + self._dest = Parms.dest + if not self._orig: + self.d.abend("no filename specified, ABEND") + else: + self._node = Node(self.d, conn=True) + size, fnum, dnum = (0, 0, 0) + for orig in self._orig: # zjistíme celkovou velikost dávky pro průběžné sledování + orig = os.path.normpath(orig) + self._node.putcmd("RECKON") + self._node.putstr(orig) + size += self._node.getnum() + fnum += self._node.getnum() + dnum += self._node.getnum() + self.longtask() + self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3) + self._counter = Counter(self.d, size) + for orig in self._orig: # vlastní download dávky + orig = os.path.normpath(orig) + self._node.putcmd("PULLFILE") + if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig)) + self._node.putstr(orig) + fn = self._node.getfn() + while len(fn) > 0: + fp = os.path.join(self._dest, fn) + size = self._node.getnum() + timestamp = self._node.getnum() + if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1)) + if size < 0: + self._node.receive_dir(fp, size, timestamp) + else: + self._node.receive_file(fp, size, timestamp, counter=self._counter) + fn = self._node.getfn() + self._counter.stop() + if self.d.ll(2): self.d.log("pull file end") + + def pulllist(self): + """ + po odeslání příkazu se načítají údaje o souborech ve tvaru + + """ + self._node = Node(self.d, conn=True) + self._node.putcmd("PULLLIST") + self._node.putstr(os.path.normpath(self._orig[0] if self._orig else ".")) + fn = self._node.getfn() + while fn: + size = self._node.getnum() + timestamp = time.asctime(time.localtime(self._node.getnum())) + print("{: 12d} {:24} {}".format(size, timestamp, fn)) + fn = self._node.getfn() + if self.d.ll(5): self.d.log("fn='{}'".format(str(fn))) + + def pushpeer(self): + if not self._orig: + self.d.abendMsg("nothing specified") + elif not any(os.path.exists(p) for p in self._orig): + self.d.abendMsg("{} not found".format(self._orig)) + else: + self._node = Node(self.d, conn=True, peering=True) + size = self.batchSize() + self.sendBatchSize(size) # poskytneme partnerovi údaje o velikosti odesílané dávky + self._counter = Counter(self.d, size) + try: + for fp in self._orig: + if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp)) + if os.path.exists(fp): + normfp = os.path.normpath(fp) + self.pushpeer_recurse(normfp, os.path.dirname(normfp)) + else: + self.d.warn("'{}' not found".format(fp)) + finally: + self._node.sendEOD() # end of batch + self._counter.stop() + + def pushpeer_recurse(self, fp, prefix): + if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix))) + if os.path.isdir(fp): + for cwd, void, entries in os.walk(fp, topdown=True): + for entry in entries: + self.pushpeer_recurse(os.path.join(cwd, entry), prefix) + self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix)) + self._node.putfileinfo(fp, os.path.relpath(fp, prefix)) + else: + self._node.putfileinfo(fp, os.path.relpath(fp, prefix)) + with open(fp, mode='rb') as f: + g = self._node.genput() + g.send(None) + data = f.read(Parms.bufSize) + while data: + self._counter.update(len(data)) + try: + g.send(data) + data = f.read(Parms.bufSize) + except Exception: + break + g.close() + if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp)) + + def sendBatchSize(self, size): + """ + odeslání informace o velikosti připravené dávky dat + informace se odešle formou informace o souboru (filename, filesize, timestamp) + """ + if self.d.ll(4): self.d.log("sending batch size to peer...") + self._node.putstr("dummy fn for batch size") + self._node.putnum(size) + self._node.putnum(0) + + def pullpeer(self): + if not Parms.bindhost: + self.d.abend("local host addr for peering not specified", None) + try: + self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True) + except Node.AllPortsBusy as e: + self.d.abend("all predefined net ports are busy", None) + try: + self.d.log("accepting...", sev=4) + accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout) + self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4) + if not accepted: return + except Exception as e: + self._node.close_ssc() + self.d.abend("peer pull accept", None) + return + finally: + self._node.UDPsignalHUP() # stop UDP broadcast + self.d.log("get batch size from peer (dummy fn)", sev=4) + self._node.getfn() # dummy fn in batch size info + batchsize = self._node.getnum() + self.d.log("batchsize={}".format(batchsize), sev=4) + self._node.getnum() # dummy timestamp + counter = Counter(self.d, batchsize) + fn = self._node.getfn() + while fn: # receive batch of file/dir objects + self.d.log("fn={}".format(fn), sev=4) + fp = os.path.join(self._dest, fn) + size = self._node.getnum() + timestamp = self._node.getnum() + if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp)) + if size < 0: + if not self._node.receive_dir(fp, size, timestamp): + self._node.close_sc() + else: + self._node.receive_file(fp, size, timestamp, counter=counter) + fn = self._node.getfn() + counter.stop() + self._node.close_sc() + self._node.close_ssc() + + def close_sc(self): + self._node.close_sc() + + def dirsize(self, fp): + p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE) + p.wait() + return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1 + + def batchSize(self): + size = 0 + for fp in self._orig: + if os.path.exists(fp): + size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp) + return size + + def srv_hack(self): + self._node = Node(self.d, conn=True) + for orig in self._orig: + self._node.putcmd("RECKON") + self._node.putstr(orig) + wholesize = self._node.getnum() + fnum = self._node.getnum() + dnum = self._node.getnum() + self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum)) + + def hack(self): + self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47') + self._node.acc(1) + + def hack_recurse(self, realfp, pref): + self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref))) + if os.path.isdir(realfp): + for cwd, void, entries in os.walk(realfp, topdown=True): + for entry in entries: + self.hack_recurse2(os.path.join(cwd, entry), pref) + self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref))) + self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref))) + else: + self.d.log("file {} processing".format(realfp))