dejsem.1.5/python/dejsem.pycharm/client.py
changeset 0 676905a3b03c
equal deleted inserted replaced
-1:000000000000 0:676905a3b03c
       
     1 # coding=utf8
       
     2 
       
     3 import socket, os, sys, time, signal, subprocess
       
     4 from d import D
       
     5 from parms import Parms
       
     6 from node import Node
       
     7 from counter import Counter
       
     8 
       
     9 # síťové operace klienta
       
    10 #	- connect to remote server na base_port+CCx
       
    11 #	- connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER
       
    12 #	- listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER
       
    13 #		- formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji
       
    14 
       
    15 class Client():
       
    16 
       
    17 	def __init__(self, d):
       
    18 
       
    19 		self.d = D("client".format(d.debid))
       
    20 
       
    21 		Parms.clientMode = True
       
    22 
       
    23 		self._orig = Parms.orig		# origin data paths
       
    24 		self._dest = Parms.dest		# destination data path
       
    25 
       
    26 		self._chan = Parms.sslchannel
       
    27 
       
    28 		if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}"
       
    29 									.format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel))
       
    30 		if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath))
       
    31 
       
    32 		act = Parms.action
       
    33 		if	 not act:
       
    34 			return
       
    35 		elif act == "PUSHCLIP":     # from cliboard to remote server
       
    36 			self.pushclip("clipboard")
       
    37 		elif act == "PUSHPRIM":     # from X primary to remote server
       
    38 			self.pushclip("primary")
       
    39 		elif act == "PUSHFILE":     # from local files to remote server
       
    40 			self.pushfile()
       
    41 		elif act == "PUSHPEER":     # from local files to remote peer
       
    42 			self.pushpeer()
       
    43 		elif act == 'PULLCLIP':     # from remote server to cliboard
       
    44 			self.pullclip()
       
    45 		elif act == 'PULLHIST':     # synchronize local clipboard history from server
       
    46 			self.pullhist()
       
    47 		elif act == 'PULLFILE':     # from remote server files to local
       
    48 			self.pullfile()
       
    49 		elif act == 'PULLLIST':     # filelist of server dir
       
    50 			self.pulllist()
       
    51 		elif act == "PULLPEER":     # from remote peer to local
       
    52 			self.pullpeer()
       
    53 		elif act == "SRVHACK":
       
    54 			self.srv_hack()
       
    55 		elif act == "HACK":
       
    56 			self.hack()
       
    57 		else:
       
    58 			self.d.log("ABEND, unknown command {}".format(act))
       
    59 		if hasattr(self, "_node"):
       
    60 			self._node.close_sc()
       
    61 			self._node.close_ssc()
       
    62 
       
    63 	def pushclip(self, buffer):
       
    64 		p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
       
    65 		if not p.stdout.read().lstrip():    	# nejdřív ověřit, že na clipboardu něco visí
       
    66 			self.d.abend("{} buf is empty".format(buffer), None)
       
    67 		self._node = Node(self.d, conn=True)    # establish server cmd session
       
    68 		self._node.putcmd("PUSHCLIP")
       
    69 		if self.d.ll(3): self.d.log("push clipboard starting...")
       
    70 		p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
       
    71 		self._node.put(p.stdout.read())
       
    72 		p.wait()
       
    73 		if self.d.ll(2): self.d.log("push clipboard end")
       
    74 
       
    75 	def longtask(self):
       
    76 		# příkaz serveru k otevření paralelního portu pro dlouhý přenos
       
    77 		# dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port
       
    78 		self._node.putcmd("LONGTASK")
       
    79 		port = self._node.getnum()
       
    80 		if port == 0:
       
    81 			self.d.abend("all server net ports are busy", None)
       
    82 		self._node.close()     # free server base port 0
       
    83 		self._node = Node(self.d, port=port)
       
    84 
       
    85 	def pushfile(self):
       
    86 		if not self._orig:
       
    87 			self.d.abendMsg("nothing specified")
       
    88 		else:
       
    89 			if not self._dest and len(self._orig) > 1:  # je-li na vstupu více jmen, tak poslední je destdir
       
    90 				self._dest = os.path.normpath(self._orig[-1])
       
    91 				if self._dest.startswith('/'): self._dest = self._dest[1:]  # výstup jenom relativně
       
    92 				del self._orig[-1]
       
    93 			if not any(os.path.exists(p) for p in self._orig):
       
    94 				self.d.abendMsg("{} not found".format(self._orig))
       
    95 			else:
       
    96 				self._node = Node(self.d, conn=True)
       
    97 				self.longtask()     # přechod na datový port
       
    98 				self._counter = Counter(self.d, self.batchSize())    # start transfer progress display
       
    99 				self._node.putcmd("PUSHFILE")
       
   100 				for fp in self._orig:
       
   101 					if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest))
       
   102 					if os.path.exists(fp):
       
   103 						normfp = os.path.normpath(fp)
       
   104 						self.pushfile_recurse(normfp, os.path.dirname(normfp))
       
   105 					else:
       
   106 						self.d.warn("pushfile: {} not found".format(fp))
       
   107 				self._counter.stop()
       
   108 				self.d.log("pushfile end", sev=2)
       
   109 
       
   110 	def pushfile_recurse(self, fp, prefix):
       
   111 		if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
       
   112 		dest = os.path.join(self._dest, os.path.relpath(fp, prefix))
       
   113 		if os.path.isdir(fp):
       
   114 			for cwd, void, entries in os.walk(fp, topdown=True):
       
   115 				for entry in entries:
       
   116 					if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry))
       
   117 					self.pushfile_recurse(os.path.join(cwd, entry), prefix)
       
   118 				self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix)))
       
   119 			self._node.putfileinfo(fp, dest)
       
   120 		else:
       
   121 			self._node.putfileinfo(fp, dest)
       
   122 			with open(fp, mode='rb') as f:
       
   123 				g = self._node.genput()
       
   124 				g.send(None)
       
   125 				data = f.read(Parms.bufSize)
       
   126 				while data:
       
   127 					self._counter.update(len(data))
       
   128 					if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data)))
       
   129 					try:
       
   130 						g.send(data)
       
   131 						data = f.read(Parms.bufSize)
       
   132 					except Exception:
       
   133 						break
       
   134 				g.close()
       
   135 
       
   136 	def pullclip(self):
       
   137 		self._node = Node(self.d, conn=True)	# establish server cmd session
       
   138 		self._node.putcmd("PULLCLIP")
       
   139 		self._node.putstr("")   # empty str means LAST entry in clipboard storage on server
       
   140 		if self.d.ll(3): self.d.log("pull clipboard starting...")
       
   141 		p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE)
       
   142 		for data in self._node.genget(size = self._node.getnum()):
       
   143 			p.stdin.write(data)
       
   144 		p.stdin.close()
       
   145 		p.wait()
       
   146 		self._node.close_sc()
       
   147 		if self.d.ll(2): self.d.log("pull clipboard end")
       
   148 
       
   149 	def pullhist(self):
       
   150 		"""
       
   151 		synchronizace historie clipboardu se serverem
       
   152 		- entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir
       
   153 		- po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů
       
   154 		"""
       
   155 		histdir = Parms.client_histdir
       
   156 		self._node = Node(self.d, conn=True)      # establish server cmd session
       
   157 		self._node.putcmd("PULLHIST")
       
   158 		self.d.log("synchronizing clipboard history...")
       
   159 		os.makedirs(histdir, mode=0o755, exist_ok=True)
       
   160 		os.chdir(histdir)
       
   161 		entries = dict()
       
   162 		toget = dict()
       
   163 		fn = self._node.getfn()
       
   164 		while len(fn) > 0:	# inventarizace serveru
       
   165 			if self.d.ll(5): self.d.log("fn=" + fn)
       
   166 			# entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp
       
   167 			# entries mohou být binární i textové, takže se nedekódují
       
   168 			entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum())
       
   169 			if not os.path.exists(fn):
       
   170 				toget[fn] = entries[fn]
       
   171 				if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget)))
       
   172 			fn = self._node.getfn()
       
   173 		self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4)
       
   174 		if len(toget):		# synchronizace
       
   175 			for fn in toget.keys():
       
   176 				self._node.putcmd("PULLCLIP")
       
   177 				self._node.putstr(fn)                           # send requested entry name
       
   178 				with open(fn, mode='wb') as f:
       
   179 					for data in self._node.genget(size = self._node.getnum()):
       
   180 						f.write(data)
       
   181 				timestamp = toget[fn][2]
       
   182 				os.utime(fn, (timestamp, timestamp))
       
   183 			self._node.close_sc()
       
   184 		p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True)
       
   185 		for fn in os.listdir():	# indexing
       
   186 			if fn == ".index": continue
       
   187 			digest = open(fn, mode="rb").read(80).replace(b'\n', b' ')
       
   188 			try: digest = digest.decode()	# to, co nepůjde dekódovat, necháme být
       
   189 			except UnicodeDecodeError: pass
       
   190 			size = os.path.getsize(fn)
       
   191 			timestamp = int(os.path.getmtime(fn))
       
   192 			p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest))
       
   193 		p.stdin.close()
       
   194 		p.wait()
       
   195 		if p.returncode == 0:
       
   196 			p = subprocess.Popen(["mc", histdir])
       
   197 			p.wait()
       
   198 		if self.d.ll(5): self.d.log("clipboard history sync finished")
       
   199 
       
   200 
       
   201 	def pullfile(self):
       
   202 		ldp = len(Parms.datapaths)
       
   203 		if ldp:
       
   204 			if ldp > 1:	# alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky
       
   205 				self._orig = Parms.datapaths[:ldp-1]
       
   206 				self._dest = Parms.datapaths[-1]
       
   207 			else:		# jedinný arg je požadavek, destination dir podle env DEST
       
   208 				self._orig = Parms.datapaths[:1]
       
   209 				self._dest = Parms.dest
       
   210 		else:			# bez argumentů: požadavek i destinace podle env
       
   211 			self._orig = Parms.orig
       
   212 			self._dest = Parms.dest
       
   213 		if not self._orig:
       
   214 			self.d.abend("no filename specified, ABEND")
       
   215 		else:
       
   216 			self._node = Node(self.d, conn=True)
       
   217 			size, fnum, dnum = (0, 0, 0)
       
   218 			for orig in self._orig:		# zjistíme celkovou velikost dávky pro průběžné sledování
       
   219 				orig = os.path.normpath(orig)
       
   220 				self._node.putcmd("RECKON")
       
   221 				self._node.putstr(orig)
       
   222 				size += self._node.getnum()
       
   223 				fnum += self._node.getnum()
       
   224 				dnum += self._node.getnum()
       
   225 			self.longtask()
       
   226 			self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3)
       
   227 			self._counter = Counter(self.d, size)
       
   228 			for orig in self._orig:		# vlastní download dávky
       
   229 				orig = os.path.normpath(orig)
       
   230 				self._node.putcmd("PULLFILE")
       
   231 				if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig))
       
   232 				self._node.putstr(orig)
       
   233 				fn = self._node.getfn()
       
   234 				while len(fn) > 0:
       
   235 					fp = os.path.join(self._dest, fn)
       
   236 					size = self._node.getnum()
       
   237 					timestamp = self._node.getnum()
       
   238 					if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1))
       
   239 					if size < 0:
       
   240 						self._node.receive_dir(fp, size, timestamp)
       
   241 					else:
       
   242 						self._node.receive_file(fp, size, timestamp, counter=self._counter)
       
   243 					fn = self._node.getfn()
       
   244 			self._counter.stop()
       
   245 			if self.d.ll(2): self.d.log("pull file end")
       
   246 
       
   247 	def pulllist(self):
       
   248 		"""
       
   249 		po odeslání příkazu se načítají údaje o souborech ve tvaru
       
   250 			<délka_fn><fn><file_size><file_timestamp>
       
   251 		"""
       
   252 		self._node = Node(self.d, conn=True)
       
   253 		self._node.putcmd("PULLLIST")
       
   254 		self._node.putstr(os.path.normpath(self._orig[0] if self._orig else "."))
       
   255 		fn = self._node.getfn()
       
   256 		while fn:
       
   257 			size = self._node.getnum()
       
   258 			timestamp = time.asctime(time.localtime(self._node.getnum()))
       
   259 			print("{: 12d} {:24} {}".format(size, timestamp, fn))
       
   260 			fn = self._node.getfn()
       
   261 			if self.d.ll(5): self.d.log("fn='{}'".format(str(fn)))
       
   262 
       
   263 	def pushpeer(self):
       
   264 		if not self._orig:
       
   265 			self.d.abendMsg("nothing specified")
       
   266 		elif not any(os.path.exists(p) for p in self._orig):
       
   267 			self.d.abendMsg("{} not found".format(self._orig))
       
   268 		else:
       
   269 			self._node = Node(self.d, conn=True, peering=True)
       
   270 			size = self.batchSize()
       
   271 			self.sendBatchSize(size)	# poskytneme partnerovi údaje o velikosti odesílané dávky
       
   272 			self._counter = Counter(self.d, size)
       
   273 			try:
       
   274 				for fp in self._orig:
       
   275 					if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp))
       
   276 					if os.path.exists(fp):
       
   277 						normfp = os.path.normpath(fp)
       
   278 						self.pushpeer_recurse(normfp, os.path.dirname(normfp))
       
   279 					else:
       
   280 						self.d.warn("'{}' not found".format(fp))
       
   281 			finally:
       
   282 				self._node.sendEOD()  # end of batch
       
   283 				self._counter.stop()
       
   284 
       
   285 	def pushpeer_recurse(self, fp, prefix):
       
   286 		if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
       
   287 		if os.path.isdir(fp):
       
   288 			for cwd, void, entries in os.walk(fp, topdown=True):
       
   289 				for entry in entries:
       
   290 					self.pushpeer_recurse(os.path.join(cwd, entry), prefix)
       
   291 				self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix))
       
   292 			self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
       
   293 		else:
       
   294 			self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
       
   295 			with open(fp, mode='rb') as f:
       
   296 				g = self._node.genput()
       
   297 				g.send(None)
       
   298 				data = f.read(Parms.bufSize)
       
   299 				while data:
       
   300 					self._counter.update(len(data))
       
   301 					try:
       
   302 						g.send(data)
       
   303 						data = f.read(Parms.bufSize)
       
   304 					except Exception:
       
   305 						break
       
   306 				g.close()
       
   307 		if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp))
       
   308 
       
   309 	def sendBatchSize(self, size):
       
   310 		"""
       
   311 		odeslání informace o velikosti připravené dávky dat
       
   312 		informace se odešle formou informace o souboru (filename, filesize, timestamp)
       
   313 		"""
       
   314 		if self.d.ll(4): self.d.log("sending batch size to peer...")
       
   315 		self._node.putstr("dummy fn for batch size")
       
   316 		self._node.putnum(size)
       
   317 		self._node.putnum(0)
       
   318 
       
   319 	def pullpeer(self):
       
   320 		if not Parms.bindhost:
       
   321 			self.d.abend("local host addr for peering not specified", None)
       
   322 		try:
       
   323 			self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True)
       
   324 		except Node.AllPortsBusy as e:
       
   325 			self.d.abend("all predefined net ports are busy", None)
       
   326 		try:
       
   327 			self.d.log("accepting...", sev=4)
       
   328 			accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout)
       
   329 			self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4)
       
   330 			if not accepted: return
       
   331 		except Exception as e:
       
   332 			self._node.close_ssc()
       
   333 			self.d.abend("peer pull accept", None)
       
   334 			return
       
   335 		finally:
       
   336 			self._node.UDPsignalHUP()		# stop UDP broadcast
       
   337 		self.d.log("get batch size from peer (dummy fn)", sev=4)
       
   338 		self._node.getfn()	# dummy fn in batch size info
       
   339 		batchsize = self._node.getnum()
       
   340 		self.d.log("batchsize={}".format(batchsize), sev=4)
       
   341 		self._node.getnum()	# dummy timestamp
       
   342 		counter = Counter(self.d, batchsize)
       
   343 		fn = self._node.getfn()
       
   344 		while fn:	# receive batch of file/dir objects
       
   345 			self.d.log("fn={}".format(fn), sev=4)
       
   346 			fp = os.path.join(self._dest, fn)
       
   347 			size = self._node.getnum()
       
   348 			timestamp = self._node.getnum()
       
   349 			if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp))
       
   350 			if size < 0:
       
   351 				if not self._node.receive_dir(fp, size, timestamp):
       
   352 					self._node.close_sc()
       
   353 			else:
       
   354 				self._node.receive_file(fp, size, timestamp, counter=counter)
       
   355 			fn = self._node.getfn()
       
   356 		counter.stop()
       
   357 		self._node.close_sc()
       
   358 		self._node.close_ssc()
       
   359 
       
   360 	def close_sc(self):
       
   361 		self._node.close_sc()
       
   362 
       
   363 	def dirsize(self, fp):
       
   364 		p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE)
       
   365 		p.wait()
       
   366 		return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1
       
   367 
       
   368 	def batchSize(self):
       
   369 		size = 0
       
   370 		for fp in self._orig:
       
   371 			if os.path.exists(fp):
       
   372 				size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp)
       
   373 		return size
       
   374 
       
   375 	def srv_hack(self):
       
   376 		self._node = Node(self.d, conn=True)
       
   377 		for orig in self._orig:
       
   378 			self._node.putcmd("RECKON")
       
   379 			self._node.putstr(orig)
       
   380 			wholesize = self._node.getnum()
       
   381 			fnum = self._node.getnum()
       
   382 			dnum = self._node.getnum()
       
   383 			self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum))
       
   384 
       
   385 	def hack(self):
       
   386 		self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47')
       
   387 		self._node.acc(1)
       
   388 
       
   389 	def hack_recurse(self, realfp, pref):
       
   390 		self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref)))
       
   391 		if os.path.isdir(realfp):
       
   392 			for cwd, void, entries in os.walk(realfp, topdown=True):
       
   393 				for entry in entries:
       
   394 					self.hack_recurse2(os.path.join(cwd, entry), pref)
       
   395 				self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref)))
       
   396 			self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref)))
       
   397 		else:
       
   398 			self.d.log("file {} processing".format(realfp))