--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/dejsem.1.5/python/dejsem.pycharm/client.py Wed Nov 27 09:50:16 2019 +0100
@@ -0,0 +1,398 @@
+# coding=utf8
+
+import socket, os, sys, time, signal, subprocess
+from d import D
+from parms import Parms
+from node import Node
+from counter import Counter
+
+# síťové operace klienta
+# - connect to remote server na base_port+CCx
+# - connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER
+# - listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER
+# - formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji
+
+class Client():
+
+ def __init__(self, d):
+
+ self.d = D("client".format(d.debid))
+
+ Parms.clientMode = True
+
+ self._orig = Parms.orig # origin data paths
+ self._dest = Parms.dest # destination data path
+
+ self._chan = Parms.sslchannel
+
+ if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}"
+ .format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel))
+ if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath))
+
+ act = Parms.action
+ if not act:
+ return
+ elif act == "PUSHCLIP": # from cliboard to remote server
+ self.pushclip("clipboard")
+ elif act == "PUSHPRIM": # from X primary to remote server
+ self.pushclip("primary")
+ elif act == "PUSHFILE": # from local files to remote server
+ self.pushfile()
+ elif act == "PUSHPEER": # from local files to remote peer
+ self.pushpeer()
+ elif act == 'PULLCLIP': # from remote server to cliboard
+ self.pullclip()
+ elif act == 'PULLHIST': # synchronize local clipboard history from server
+ self.pullhist()
+ elif act == 'PULLFILE': # from remote server files to local
+ self.pullfile()
+ elif act == 'PULLLIST': # filelist of server dir
+ self.pulllist()
+ elif act == "PULLPEER": # from remote peer to local
+ self.pullpeer()
+ elif act == "SRVHACK":
+ self.srv_hack()
+ elif act == "HACK":
+ self.hack()
+ else:
+ self.d.log("ABEND, unknown command {}".format(act))
+ if hasattr(self, "_node"):
+ self._node.close_sc()
+ self._node.close_ssc()
+
+ def pushclip(self, buffer):
+ p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
+ if not p.stdout.read().lstrip(): # nejdřív ověřit, že na clipboardu něco visí
+ self.d.abend("{} buf is empty".format(buffer), None)
+ self._node = Node(self.d, conn=True) # establish server cmd session
+ self._node.putcmd("PUSHCLIP")
+ if self.d.ll(3): self.d.log("push clipboard starting...")
+ p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
+ self._node.put(p.stdout.read())
+ p.wait()
+ if self.d.ll(2): self.d.log("push clipboard end")
+
+ def longtask(self):
+ # příkaz serveru k otevření paralelního portu pro dlouhý přenos
+ # dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port
+ self._node.putcmd("LONGTASK")
+ port = self._node.getnum()
+ if port == 0:
+ self.d.abend("all server net ports are busy", None)
+ self._node.close() # free server base port 0
+ self._node = Node(self.d, port=port)
+
+ def pushfile(self):
+ if not self._orig:
+ self.d.abendMsg("nothing specified")
+ else:
+ if not self._dest and len(self._orig) > 1: # je-li na vstupu více jmen, tak poslední je destdir
+ self._dest = os.path.normpath(self._orig[-1])
+ if self._dest.startswith('/'): self._dest = self._dest[1:] # výstup jenom relativně
+ del self._orig[-1]
+ if not any(os.path.exists(p) for p in self._orig):
+ self.d.abendMsg("{} not found".format(self._orig))
+ else:
+ self._node = Node(self.d, conn=True)
+ self.longtask() # přechod na datový port
+ self._counter = Counter(self.d, self.batchSize()) # start transfer progress display
+ self._node.putcmd("PUSHFILE")
+ for fp in self._orig:
+ if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest))
+ if os.path.exists(fp):
+ normfp = os.path.normpath(fp)
+ self.pushfile_recurse(normfp, os.path.dirname(normfp))
+ else:
+ self.d.warn("pushfile: {} not found".format(fp))
+ self._counter.stop()
+ self.d.log("pushfile end", sev=2)
+
+ def pushfile_recurse(self, fp, prefix):
+ if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
+ dest = os.path.join(self._dest, os.path.relpath(fp, prefix))
+ if os.path.isdir(fp):
+ for cwd, void, entries in os.walk(fp, topdown=True):
+ for entry in entries:
+ if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry))
+ self.pushfile_recurse(os.path.join(cwd, entry), prefix)
+ self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix)))
+ self._node.putfileinfo(fp, dest)
+ else:
+ self._node.putfileinfo(fp, dest)
+ with open(fp, mode='rb') as f:
+ g = self._node.genput()
+ g.send(None)
+ data = f.read(Parms.bufSize)
+ while data:
+ self._counter.update(len(data))
+ if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data)))
+ try:
+ g.send(data)
+ data = f.read(Parms.bufSize)
+ except Exception:
+ break
+ g.close()
+
+ def pullclip(self):
+ self._node = Node(self.d, conn=True) # establish server cmd session
+ self._node.putcmd("PULLCLIP")
+ self._node.putstr("") # empty str means LAST entry in clipboard storage on server
+ if self.d.ll(3): self.d.log("pull clipboard starting...")
+ p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE)
+ for data in self._node.genget(size = self._node.getnum()):
+ p.stdin.write(data)
+ p.stdin.close()
+ p.wait()
+ self._node.close_sc()
+ if self.d.ll(2): self.d.log("pull clipboard end")
+
+ def pullhist(self):
+ """
+ synchronizace historie clipboardu se serverem
+ - entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir
+ - po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů
+ """
+ histdir = Parms.client_histdir
+ self._node = Node(self.d, conn=True) # establish server cmd session
+ self._node.putcmd("PULLHIST")
+ self.d.log("synchronizing clipboard history...")
+ os.makedirs(histdir, mode=0o755, exist_ok=True)
+ os.chdir(histdir)
+ entries = dict()
+ toget = dict()
+ fn = self._node.getfn()
+ while len(fn) > 0: # inventarizace serveru
+ if self.d.ll(5): self.d.log("fn=" + fn)
+ # entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp
+ # entries mohou být binární i textové, takže se nedekódují
+ entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum())
+ if not os.path.exists(fn):
+ toget[fn] = entries[fn]
+ if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget)))
+ fn = self._node.getfn()
+ self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4)
+ if len(toget): # synchronizace
+ for fn in toget.keys():
+ self._node.putcmd("PULLCLIP")
+ self._node.putstr(fn) # send requested entry name
+ with open(fn, mode='wb') as f:
+ for data in self._node.genget(size = self._node.getnum()):
+ f.write(data)
+ timestamp = toget[fn][2]
+ os.utime(fn, (timestamp, timestamp))
+ self._node.close_sc()
+ p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True)
+ for fn in os.listdir(): # indexing
+ if fn == ".index": continue
+ digest = open(fn, mode="rb").read(80).replace(b'\n', b' ')
+ try: digest = digest.decode() # to, co nepůjde dekódovat, necháme být
+ except UnicodeDecodeError: pass
+ size = os.path.getsize(fn)
+ timestamp = int(os.path.getmtime(fn))
+ p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest))
+ p.stdin.close()
+ p.wait()
+ if p.returncode == 0:
+ p = subprocess.Popen(["mc", histdir])
+ p.wait()
+ if self.d.ll(5): self.d.log("clipboard history sync finished")
+
+
+ def pullfile(self):
+ ldp = len(Parms.datapaths)
+ if ldp:
+ if ldp > 1: # alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky
+ self._orig = Parms.datapaths[:ldp-1]
+ self._dest = Parms.datapaths[-1]
+ else: # jedinný arg je požadavek, destination dir podle env DEST
+ self._orig = Parms.datapaths[:1]
+ self._dest = Parms.dest
+ else: # bez argumentů: požadavek i destinace podle env
+ self._orig = Parms.orig
+ self._dest = Parms.dest
+ if not self._orig:
+ self.d.abend("no filename specified, ABEND")
+ else:
+ self._node = Node(self.d, conn=True)
+ size, fnum, dnum = (0, 0, 0)
+ for orig in self._orig: # zjistíme celkovou velikost dávky pro průběžné sledování
+ orig = os.path.normpath(orig)
+ self._node.putcmd("RECKON")
+ self._node.putstr(orig)
+ size += self._node.getnum()
+ fnum += self._node.getnum()
+ dnum += self._node.getnum()
+ self.longtask()
+ self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3)
+ self._counter = Counter(self.d, size)
+ for orig in self._orig: # vlastní download dávky
+ orig = os.path.normpath(orig)
+ self._node.putcmd("PULLFILE")
+ if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig))
+ self._node.putstr(orig)
+ fn = self._node.getfn()
+ while len(fn) > 0:
+ fp = os.path.join(self._dest, fn)
+ size = self._node.getnum()
+ timestamp = self._node.getnum()
+ if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1))
+ if size < 0:
+ self._node.receive_dir(fp, size, timestamp)
+ else:
+ self._node.receive_file(fp, size, timestamp, counter=self._counter)
+ fn = self._node.getfn()
+ self._counter.stop()
+ if self.d.ll(2): self.d.log("pull file end")
+
+ def pulllist(self):
+ """
+ po odeslání příkazu se načítají údaje o souborech ve tvaru
+ <délka_fn><fn><file_size><file_timestamp>
+ """
+ self._node = Node(self.d, conn=True)
+ self._node.putcmd("PULLLIST")
+ self._node.putstr(os.path.normpath(self._orig[0] if self._orig else "."))
+ fn = self._node.getfn()
+ while fn:
+ size = self._node.getnum()
+ timestamp = time.asctime(time.localtime(self._node.getnum()))
+ print("{: 12d} {:24} {}".format(size, timestamp, fn))
+ fn = self._node.getfn()
+ if self.d.ll(5): self.d.log("fn='{}'".format(str(fn)))
+
+ def pushpeer(self):
+ if not self._orig:
+ self.d.abendMsg("nothing specified")
+ elif not any(os.path.exists(p) for p in self._orig):
+ self.d.abendMsg("{} not found".format(self._orig))
+ else:
+ self._node = Node(self.d, conn=True, peering=True)
+ size = self.batchSize()
+ self.sendBatchSize(size) # poskytneme partnerovi údaje o velikosti odesílané dávky
+ self._counter = Counter(self.d, size)
+ try:
+ for fp in self._orig:
+ if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp))
+ if os.path.exists(fp):
+ normfp = os.path.normpath(fp)
+ self.pushpeer_recurse(normfp, os.path.dirname(normfp))
+ else:
+ self.d.warn("'{}' not found".format(fp))
+ finally:
+ self._node.sendEOD() # end of batch
+ self._counter.stop()
+
+ def pushpeer_recurse(self, fp, prefix):
+ if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
+ if os.path.isdir(fp):
+ for cwd, void, entries in os.walk(fp, topdown=True):
+ for entry in entries:
+ self.pushpeer_recurse(os.path.join(cwd, entry), prefix)
+ self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix))
+ self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
+ else:
+ self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
+ with open(fp, mode='rb') as f:
+ g = self._node.genput()
+ g.send(None)
+ data = f.read(Parms.bufSize)
+ while data:
+ self._counter.update(len(data))
+ try:
+ g.send(data)
+ data = f.read(Parms.bufSize)
+ except Exception:
+ break
+ g.close()
+ if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp))
+
+ def sendBatchSize(self, size):
+ """
+ odeslání informace o velikosti připravené dávky dat
+ informace se odešle formou informace o souboru (filename, filesize, timestamp)
+ """
+ if self.d.ll(4): self.d.log("sending batch size to peer...")
+ self._node.putstr("dummy fn for batch size")
+ self._node.putnum(size)
+ self._node.putnum(0)
+
+ def pullpeer(self):
+ if not Parms.bindhost:
+ self.d.abend("local host addr for peering not specified", None)
+ try:
+ self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True)
+ except Node.AllPortsBusy as e:
+ self.d.abend("all predefined net ports are busy", None)
+ try:
+ self.d.log("accepting...", sev=4)
+ accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout)
+ self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4)
+ if not accepted: return
+ except Exception as e:
+ self._node.close_ssc()
+ self.d.abend("peer pull accept", None)
+ return
+ finally:
+ self._node.UDPsignalHUP() # stop UDP broadcast
+ self.d.log("get batch size from peer (dummy fn)", sev=4)
+ self._node.getfn() # dummy fn in batch size info
+ batchsize = self._node.getnum()
+ self.d.log("batchsize={}".format(batchsize), sev=4)
+ self._node.getnum() # dummy timestamp
+ counter = Counter(self.d, batchsize)
+ fn = self._node.getfn()
+ while fn: # receive batch of file/dir objects
+ self.d.log("fn={}".format(fn), sev=4)
+ fp = os.path.join(self._dest, fn)
+ size = self._node.getnum()
+ timestamp = self._node.getnum()
+ if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp))
+ if size < 0:
+ if not self._node.receive_dir(fp, size, timestamp):
+ self._node.close_sc()
+ else:
+ self._node.receive_file(fp, size, timestamp, counter=counter)
+ fn = self._node.getfn()
+ counter.stop()
+ self._node.close_sc()
+ self._node.close_ssc()
+
+ def close_sc(self):
+ self._node.close_sc()
+
+ def dirsize(self, fp):
+ p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE)
+ p.wait()
+ return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1
+
+ def batchSize(self):
+ size = 0
+ for fp in self._orig:
+ if os.path.exists(fp):
+ size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp)
+ return size
+
+ def srv_hack(self):
+ self._node = Node(self.d, conn=True)
+ for orig in self._orig:
+ self._node.putcmd("RECKON")
+ self._node.putstr(orig)
+ wholesize = self._node.getnum()
+ fnum = self._node.getnum()
+ dnum = self._node.getnum()
+ self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum))
+
+ def hack(self):
+ self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47')
+ self._node.acc(1)
+
+ def hack_recurse(self, realfp, pref):
+ self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref)))
+ if os.path.isdir(realfp):
+ for cwd, void, entries in os.walk(realfp, topdown=True):
+ for entry in entries:
+ self.hack_recurse2(os.path.join(cwd, entry), pref)
+ self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref)))
+ self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref)))
+ else:
+ self.d.log("file {} processing".format(realfp))