dejsem.1.5/python/dejsem.pycharm/client.py
changeset 0 676905a3b03c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dejsem.1.5/python/dejsem.pycharm/client.py	Wed Nov 27 09:50:16 2019 +0100
@@ -0,0 +1,398 @@
+# coding=utf8
+
+import socket, os, sys, time, signal, subprocess
+from d import D
+from parms import Parms
+from node import Node
+from counter import Counter
+
+# síťové operace klienta
+#	- connect to remote server na base_port+CCx
+#	- connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER
+#	- listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER
+#		- formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji
+
+class Client():
+
+	def __init__(self, d):
+
+		self.d = D("client".format(d.debid))
+
+		Parms.clientMode = True
+
+		self._orig = Parms.orig		# origin data paths
+		self._dest = Parms.dest		# destination data path
+
+		self._chan = Parms.sslchannel
+
+		if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}"
+									.format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel))
+		if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath))
+
+		act = Parms.action
+		if	 not act:
+			return
+		elif act == "PUSHCLIP":     # from cliboard to remote server
+			self.pushclip("clipboard")
+		elif act == "PUSHPRIM":     # from X primary to remote server
+			self.pushclip("primary")
+		elif act == "PUSHFILE":     # from local files to remote server
+			self.pushfile()
+		elif act == "PUSHPEER":     # from local files to remote peer
+			self.pushpeer()
+		elif act == 'PULLCLIP':     # from remote server to cliboard
+			self.pullclip()
+		elif act == 'PULLHIST':     # synchronize local clipboard history from server
+			self.pullhist()
+		elif act == 'PULLFILE':     # from remote server files to local
+			self.pullfile()
+		elif act == 'PULLLIST':     # filelist of server dir
+			self.pulllist()
+		elif act == "PULLPEER":     # from remote peer to local
+			self.pullpeer()
+		elif act == "SRVHACK":
+			self.srv_hack()
+		elif act == "HACK":
+			self.hack()
+		else:
+			self.d.log("ABEND, unknown command {}".format(act))
+		if hasattr(self, "_node"):
+			self._node.close_sc()
+			self._node.close_ssc()
+
+	def pushclip(self, buffer):
+		p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
+		if not p.stdout.read().lstrip():    	# nejdřív ověřit, že na clipboardu něco visí
+			self.d.abend("{} buf is empty".format(buffer), None)
+		self._node = Node(self.d, conn=True)    # establish server cmd session
+		self._node.putcmd("PUSHCLIP")
+		if self.d.ll(3): self.d.log("push clipboard starting...")
+		p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
+		self._node.put(p.stdout.read())
+		p.wait()
+		if self.d.ll(2): self.d.log("push clipboard end")
+
+	def longtask(self):
+		# příkaz serveru k otevření paralelního portu pro dlouhý přenos
+		# dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port
+		self._node.putcmd("LONGTASK")
+		port = self._node.getnum()
+		if port == 0:
+			self.d.abend("all server net ports are busy", None)
+		self._node.close()     # free server base port 0
+		self._node = Node(self.d, port=port)
+
+	def pushfile(self):
+		if not self._orig:
+			self.d.abendMsg("nothing specified")
+		else:
+			if not self._dest and len(self._orig) > 1:  # je-li na vstupu více jmen, tak poslední je destdir
+				self._dest = os.path.normpath(self._orig[-1])
+				if self._dest.startswith('/'): self._dest = self._dest[1:]  # výstup jenom relativně
+				del self._orig[-1]
+			if not any(os.path.exists(p) for p in self._orig):
+				self.d.abendMsg("{} not found".format(self._orig))
+			else:
+				self._node = Node(self.d, conn=True)
+				self.longtask()     # přechod na datový port
+				self._counter = Counter(self.d, self.batchSize())    # start transfer progress display
+				self._node.putcmd("PUSHFILE")
+				for fp in self._orig:
+					if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest))
+					if os.path.exists(fp):
+						normfp = os.path.normpath(fp)
+						self.pushfile_recurse(normfp, os.path.dirname(normfp))
+					else:
+						self.d.warn("pushfile: {} not found".format(fp))
+				self._counter.stop()
+				self.d.log("pushfile end", sev=2)
+
+	def pushfile_recurse(self, fp, prefix):
+		if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
+		dest = os.path.join(self._dest, os.path.relpath(fp, prefix))
+		if os.path.isdir(fp):
+			for cwd, void, entries in os.walk(fp, topdown=True):
+				for entry in entries:
+					if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry))
+					self.pushfile_recurse(os.path.join(cwd, entry), prefix)
+				self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix)))
+			self._node.putfileinfo(fp, dest)
+		else:
+			self._node.putfileinfo(fp, dest)
+			with open(fp, mode='rb') as f:
+				g = self._node.genput()
+				g.send(None)
+				data = f.read(Parms.bufSize)
+				while data:
+					self._counter.update(len(data))
+					if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data)))
+					try:
+						g.send(data)
+						data = f.read(Parms.bufSize)
+					except Exception:
+						break
+				g.close()
+
+	def pullclip(self):
+		self._node = Node(self.d, conn=True)	# establish server cmd session
+		self._node.putcmd("PULLCLIP")
+		self._node.putstr("")   # empty str means LAST entry in clipboard storage on server
+		if self.d.ll(3): self.d.log("pull clipboard starting...")
+		p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE)
+		for data in self._node.genget(size = self._node.getnum()):
+			p.stdin.write(data)
+		p.stdin.close()
+		p.wait()
+		self._node.close_sc()
+		if self.d.ll(2): self.d.log("pull clipboard end")
+
+	def pullhist(self):
+		"""
+		synchronizace historie clipboardu se serverem
+		- entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir
+		- po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů
+		"""
+		histdir = Parms.client_histdir
+		self._node = Node(self.d, conn=True)      # establish server cmd session
+		self._node.putcmd("PULLHIST")
+		self.d.log("synchronizing clipboard history...")
+		os.makedirs(histdir, mode=0o755, exist_ok=True)
+		os.chdir(histdir)
+		entries = dict()
+		toget = dict()
+		fn = self._node.getfn()
+		while len(fn) > 0:	# inventarizace serveru
+			if self.d.ll(5): self.d.log("fn=" + fn)
+			# entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp
+			# entries mohou být binární i textové, takže se nedekódují
+			entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum())
+			if not os.path.exists(fn):
+				toget[fn] = entries[fn]
+				if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget)))
+			fn = self._node.getfn()
+		self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4)
+		if len(toget):		# synchronizace
+			for fn in toget.keys():
+				self._node.putcmd("PULLCLIP")
+				self._node.putstr(fn)                           # send requested entry name
+				with open(fn, mode='wb') as f:
+					for data in self._node.genget(size = self._node.getnum()):
+						f.write(data)
+				timestamp = toget[fn][2]
+				os.utime(fn, (timestamp, timestamp))
+			self._node.close_sc()
+		p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True)
+		for fn in os.listdir():	# indexing
+			if fn == ".index": continue
+			digest = open(fn, mode="rb").read(80).replace(b'\n', b' ')
+			try: digest = digest.decode()	# to, co nepůjde dekódovat, necháme být
+			except UnicodeDecodeError: pass
+			size = os.path.getsize(fn)
+			timestamp = int(os.path.getmtime(fn))
+			p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest))
+		p.stdin.close()
+		p.wait()
+		if p.returncode == 0:
+			p = subprocess.Popen(["mc", histdir])
+			p.wait()
+		if self.d.ll(5): self.d.log("clipboard history sync finished")
+
+
+	def pullfile(self):
+		ldp = len(Parms.datapaths)
+		if ldp:
+			if ldp > 1:	# alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky
+				self._orig = Parms.datapaths[:ldp-1]
+				self._dest = Parms.datapaths[-1]
+			else:		# jedinný arg je požadavek, destination dir podle env DEST
+				self._orig = Parms.datapaths[:1]
+				self._dest = Parms.dest
+		else:			# bez argumentů: požadavek i destinace podle env
+			self._orig = Parms.orig
+			self._dest = Parms.dest
+		if not self._orig:
+			self.d.abend("no filename specified, ABEND")
+		else:
+			self._node = Node(self.d, conn=True)
+			size, fnum, dnum = (0, 0, 0)
+			for orig in self._orig:		# zjistíme celkovou velikost dávky pro průběžné sledování
+				orig = os.path.normpath(orig)
+				self._node.putcmd("RECKON")
+				self._node.putstr(orig)
+				size += self._node.getnum()
+				fnum += self._node.getnum()
+				dnum += self._node.getnum()
+			self.longtask()
+			self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3)
+			self._counter = Counter(self.d, size)
+			for orig in self._orig:		# vlastní download dávky
+				orig = os.path.normpath(orig)
+				self._node.putcmd("PULLFILE")
+				if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig))
+				self._node.putstr(orig)
+				fn = self._node.getfn()
+				while len(fn) > 0:
+					fp = os.path.join(self._dest, fn)
+					size = self._node.getnum()
+					timestamp = self._node.getnum()
+					if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1))
+					if size < 0:
+						self._node.receive_dir(fp, size, timestamp)
+					else:
+						self._node.receive_file(fp, size, timestamp, counter=self._counter)
+					fn = self._node.getfn()
+			self._counter.stop()
+			if self.d.ll(2): self.d.log("pull file end")
+
+	def pulllist(self):
+		"""
+		po odeslání příkazu se načítají údaje o souborech ve tvaru
+			<délka_fn><fn><file_size><file_timestamp>
+		"""
+		self._node = Node(self.d, conn=True)
+		self._node.putcmd("PULLLIST")
+		self._node.putstr(os.path.normpath(self._orig[0] if self._orig else "."))
+		fn = self._node.getfn()
+		while fn:
+			size = self._node.getnum()
+			timestamp = time.asctime(time.localtime(self._node.getnum()))
+			print("{: 12d} {:24} {}".format(size, timestamp, fn))
+			fn = self._node.getfn()
+			if self.d.ll(5): self.d.log("fn='{}'".format(str(fn)))
+
+	def pushpeer(self):
+		if not self._orig:
+			self.d.abendMsg("nothing specified")
+		elif not any(os.path.exists(p) for p in self._orig):
+			self.d.abendMsg("{} not found".format(self._orig))
+		else:
+			self._node = Node(self.d, conn=True, peering=True)
+			size = self.batchSize()
+			self.sendBatchSize(size)	# poskytneme partnerovi údaje o velikosti odesílané dávky
+			self._counter = Counter(self.d, size)
+			try:
+				for fp in self._orig:
+					if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp))
+					if os.path.exists(fp):
+						normfp = os.path.normpath(fp)
+						self.pushpeer_recurse(normfp, os.path.dirname(normfp))
+					else:
+						self.d.warn("'{}' not found".format(fp))
+			finally:
+				self._node.sendEOD()  # end of batch
+				self._counter.stop()
+
+	def pushpeer_recurse(self, fp, prefix):
+		if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
+		if os.path.isdir(fp):
+			for cwd, void, entries in os.walk(fp, topdown=True):
+				for entry in entries:
+					self.pushpeer_recurse(os.path.join(cwd, entry), prefix)
+				self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix))
+			self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
+		else:
+			self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
+			with open(fp, mode='rb') as f:
+				g = self._node.genput()
+				g.send(None)
+				data = f.read(Parms.bufSize)
+				while data:
+					self._counter.update(len(data))
+					try:
+						g.send(data)
+						data = f.read(Parms.bufSize)
+					except Exception:
+						break
+				g.close()
+		if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp))
+
+	def sendBatchSize(self, size):
+		"""
+		odeslání informace o velikosti připravené dávky dat
+		informace se odešle formou informace o souboru (filename, filesize, timestamp)
+		"""
+		if self.d.ll(4): self.d.log("sending batch size to peer...")
+		self._node.putstr("dummy fn for batch size")
+		self._node.putnum(size)
+		self._node.putnum(0)
+
+	def pullpeer(self):
+		if not Parms.bindhost:
+			self.d.abend("local host addr for peering not specified", None)
+		try:
+			self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True)
+		except Node.AllPortsBusy as e:
+			self.d.abend("all predefined net ports are busy", None)
+		try:
+			self.d.log("accepting...", sev=4)
+			accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout)
+			self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4)
+			if not accepted: return
+		except Exception as e:
+			self._node.close_ssc()
+			self.d.abend("peer pull accept", None)
+			return
+		finally:
+			self._node.UDPsignalHUP()		# stop UDP broadcast
+		self.d.log("get batch size from peer (dummy fn)", sev=4)
+		self._node.getfn()	# dummy fn in batch size info
+		batchsize = self._node.getnum()
+		self.d.log("batchsize={}".format(batchsize), sev=4)
+		self._node.getnum()	# dummy timestamp
+		counter = Counter(self.d, batchsize)
+		fn = self._node.getfn()
+		while fn:	# receive batch of file/dir objects
+			self.d.log("fn={}".format(fn), sev=4)
+			fp = os.path.join(self._dest, fn)
+			size = self._node.getnum()
+			timestamp = self._node.getnum()
+			if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp))
+			if size < 0:
+				if not self._node.receive_dir(fp, size, timestamp):
+					self._node.close_sc()
+			else:
+				self._node.receive_file(fp, size, timestamp, counter=counter)
+			fn = self._node.getfn()
+		counter.stop()
+		self._node.close_sc()
+		self._node.close_ssc()
+
+	def close_sc(self):
+		self._node.close_sc()
+
+	def dirsize(self, fp):
+		p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE)
+		p.wait()
+		return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1
+
+	def batchSize(self):
+		size = 0
+		for fp in self._orig:
+			if os.path.exists(fp):
+				size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp)
+		return size
+
+	def srv_hack(self):
+		self._node = Node(self.d, conn=True)
+		for orig in self._orig:
+			self._node.putcmd("RECKON")
+			self._node.putstr(orig)
+			wholesize = self._node.getnum()
+			fnum = self._node.getnum()
+			dnum = self._node.getnum()
+			self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum))
+
+	def hack(self):
+		self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47')
+		self._node.acc(1)
+
+	def hack_recurse(self, realfp, pref):
+		self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref)))
+		if os.path.isdir(realfp):
+			for cwd, void, entries in os.walk(realfp, topdown=True):
+				for entry in entries:
+					self.hack_recurse2(os.path.join(cwd, entry), pref)
+				self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref)))
+			self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref)))
+		else:
+			self.d.log("file {} processing".format(realfp))