# coding=utf8
import socket, os, sys, time, signal, subprocess
from d import D
from parms import Parms
from node import Node
from counter import Counter
# síťové operace klienta
# - connect to remote server na base_port+CCx
# - connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER
# - listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER
# - formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji
class Client():
def __init__(self, d):
self.d = D("client".format(d.debid))
Parms.clientMode = True
self._orig = Parms.orig # origin data paths
self._dest = Parms.dest # destination data path
self._chan = Parms.sslchannel
if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}"
.format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel))
if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath))
act = Parms.action
if not act:
return
elif act == "PUSHCLIP": # from cliboard to remote server
self.pushclip("clipboard")
elif act == "PUSHPRIM": # from X primary to remote server
self.pushclip("primary")
elif act == "PUSHFILE": # from local files to remote server
self.pushfile()
elif act == "PUSHPEER": # from local files to remote peer
self.pushpeer()
elif act == 'PULLCLIP': # from remote server to cliboard
self.pullclip()
elif act == 'PULLHIST': # synchronize local clipboard history from server
self.pullhist()
elif act == 'PULLFILE': # from remote server files to local
self.pullfile()
elif act == 'PULLLIST': # filelist of server dir
self.pulllist()
elif act == "PULLPEER": # from remote peer to local
self.pullpeer()
elif act == "SRVHACK":
self.srv_hack()
elif act == "HACK":
self.hack()
else:
self.d.log("ABEND, unknown command {}".format(act))
if hasattr(self, "_node"):
self._node.close_sc()
self._node.close_ssc()
def pushclip(self, buffer):
p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
if not p.stdout.read().lstrip(): # nejdřív ověřit, že na clipboardu něco visí
self.d.abend("{} buf is empty".format(buffer), None)
self._node = Node(self.d, conn=True) # establish server cmd session
self._node.putcmd("PUSHCLIP")
if self.d.ll(3): self.d.log("push clipboard starting...")
p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE)
self._node.put(p.stdout.read())
p.wait()
if self.d.ll(2): self.d.log("push clipboard end")
def longtask(self):
# příkaz serveru k otevření paralelního portu pro dlouhý přenos
# dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port
self._node.putcmd("LONGTASK")
port = self._node.getnum()
if port == 0:
self.d.abend("all server net ports are busy", None)
self._node.close() # free server base port 0
self._node = Node(self.d, port=port)
def pushfile(self):
if not self._orig:
self.d.abendMsg("nothing specified")
else:
if not self._dest and len(self._orig) > 1: # je-li na vstupu více jmen, tak poslední je destdir
self._dest = os.path.normpath(self._orig[-1])
if self._dest.startswith('/'): self._dest = self._dest[1:] # výstup jenom relativně
del self._orig[-1]
if not any(os.path.exists(p) for p in self._orig):
self.d.abendMsg("{} not found".format(self._orig))
else:
self._node = Node(self.d, conn=True)
self.longtask() # přechod na datový port
self._counter = Counter(self.d, self.batchSize()) # start transfer progress display
self._node.putcmd("PUSHFILE")
for fp in self._orig:
if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest))
if os.path.exists(fp):
normfp = os.path.normpath(fp)
self.pushfile_recurse(normfp, os.path.dirname(normfp))
else:
self.d.warn("pushfile: {} not found".format(fp))
self._counter.stop()
self.d.log("pushfile end", sev=2)
def pushfile_recurse(self, fp, prefix):
if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
dest = os.path.join(self._dest, os.path.relpath(fp, prefix))
if os.path.isdir(fp):
for cwd, void, entries in os.walk(fp, topdown=True):
for entry in entries:
if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry))
self.pushfile_recurse(os.path.join(cwd, entry), prefix)
self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix)))
self._node.putfileinfo(fp, dest)
else:
self._node.putfileinfo(fp, dest)
with open(fp, mode='rb') as f:
g = self._node.genput()
g.send(None)
data = f.read(Parms.bufSize)
while data:
self._counter.update(len(data))
if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data)))
try:
g.send(data)
data = f.read(Parms.bufSize)
except Exception:
break
g.close()
def pullclip(self):
self._node = Node(self.d, conn=True) # establish server cmd session
self._node.putcmd("PULLCLIP")
self._node.putstr("") # empty str means LAST entry in clipboard storage on server
if self.d.ll(3): self.d.log("pull clipboard starting...")
p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE)
for data in self._node.genget(size = self._node.getnum()):
p.stdin.write(data)
p.stdin.close()
p.wait()
self._node.close_sc()
if self.d.ll(2): self.d.log("pull clipboard end")
def pullhist(self):
"""
synchronizace historie clipboardu se serverem
- entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir
- po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů
"""
histdir = Parms.client_histdir
self._node = Node(self.d, conn=True) # establish server cmd session
self._node.putcmd("PULLHIST")
self.d.log("synchronizing clipboard history...")
os.makedirs(histdir, mode=0o755, exist_ok=True)
os.chdir(histdir)
entries = dict()
toget = dict()
fn = self._node.getfn()
while len(fn) > 0: # inventarizace serveru
if self.d.ll(5): self.d.log("fn=" + fn)
# entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp
# entries mohou být binární i textové, takže se nedekódují
entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum())
if not os.path.exists(fn):
toget[fn] = entries[fn]
if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget)))
fn = self._node.getfn()
self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4)
if len(toget): # synchronizace
for fn in toget.keys():
self._node.putcmd("PULLCLIP")
self._node.putstr(fn) # send requested entry name
with open(fn, mode='wb') as f:
for data in self._node.genget(size = self._node.getnum()):
f.write(data)
timestamp = toget[fn][2]
os.utime(fn, (timestamp, timestamp))
self._node.close_sc()
p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True)
for fn in os.listdir(): # indexing
if fn == ".index": continue
digest = open(fn, mode="rb").read(80).replace(b'\n', b' ')
try: digest = digest.decode() # to, co nepůjde dekódovat, necháme být
except UnicodeDecodeError: pass
size = os.path.getsize(fn)
timestamp = int(os.path.getmtime(fn))
p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest))
p.stdin.close()
p.wait()
if p.returncode == 0:
p = subprocess.Popen(["mc", histdir])
p.wait()
if self.d.ll(5): self.d.log("clipboard history sync finished")
def pullfile(self):
ldp = len(Parms.datapaths)
if ldp:
if ldp > 1: # alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky
self._orig = Parms.datapaths[:ldp-1]
self._dest = Parms.datapaths[-1]
else: # jedinný arg je požadavek, destination dir podle env DEST
self._orig = Parms.datapaths[:1]
self._dest = Parms.dest
else: # bez argumentů: požadavek i destinace podle env
self._orig = Parms.orig
self._dest = Parms.dest
if not self._orig:
self.d.abend("no filename specified, ABEND")
else:
self._node = Node(self.d, conn=True)
size, fnum, dnum = (0, 0, 0)
for orig in self._orig: # zjistíme celkovou velikost dávky pro průběžné sledování
orig = os.path.normpath(orig)
self._node.putcmd("RECKON")
self._node.putstr(orig)
size += self._node.getnum()
fnum += self._node.getnum()
dnum += self._node.getnum()
self.longtask()
self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3)
self._counter = Counter(self.d, size)
for orig in self._orig: # vlastní download dávky
orig = os.path.normpath(orig)
self._node.putcmd("PULLFILE")
if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig))
self._node.putstr(orig)
fn = self._node.getfn()
while len(fn) > 0:
fp = os.path.join(self._dest, fn)
size = self._node.getnum()
timestamp = self._node.getnum()
if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1))
if size < 0:
self._node.receive_dir(fp, size, timestamp)
else:
self._node.receive_file(fp, size, timestamp, counter=self._counter)
fn = self._node.getfn()
self._counter.stop()
if self.d.ll(2): self.d.log("pull file end")
def pulllist(self):
"""
po odeslání příkazu se načítají údaje o souborech ve tvaru
<délka_fn><fn><file_size><file_timestamp>
"""
self._node = Node(self.d, conn=True)
self._node.putcmd("PULLLIST")
self._node.putstr(os.path.normpath(self._orig[0] if self._orig else "."))
fn = self._node.getfn()
while fn:
size = self._node.getnum()
timestamp = time.asctime(time.localtime(self._node.getnum()))
print("{: 12d} {:24} {}".format(size, timestamp, fn))
fn = self._node.getfn()
if self.d.ll(5): self.d.log("fn='{}'".format(str(fn)))
def pushpeer(self):
if not self._orig:
self.d.abendMsg("nothing specified")
elif not any(os.path.exists(p) for p in self._orig):
self.d.abendMsg("{} not found".format(self._orig))
else:
self._node = Node(self.d, conn=True, peering=True)
size = self.batchSize()
self.sendBatchSize(size) # poskytneme partnerovi údaje o velikosti odesílané dávky
self._counter = Counter(self.d, size)
try:
for fp in self._orig:
if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp))
if os.path.exists(fp):
normfp = os.path.normpath(fp)
self.pushpeer_recurse(normfp, os.path.dirname(normfp))
else:
self.d.warn("'{}' not found".format(fp))
finally:
self._node.sendEOD() # end of batch
self._counter.stop()
def pushpeer_recurse(self, fp, prefix):
if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix)))
if os.path.isdir(fp):
for cwd, void, entries in os.walk(fp, topdown=True):
for entry in entries:
self.pushpeer_recurse(os.path.join(cwd, entry), prefix)
self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix))
self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
else:
self._node.putfileinfo(fp, os.path.relpath(fp, prefix))
with open(fp, mode='rb') as f:
g = self._node.genput()
g.send(None)
data = f.read(Parms.bufSize)
while data:
self._counter.update(len(data))
try:
g.send(data)
data = f.read(Parms.bufSize)
except Exception:
break
g.close()
if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp))
def sendBatchSize(self, size):
"""
odeslání informace o velikosti připravené dávky dat
informace se odešle formou informace o souboru (filename, filesize, timestamp)
"""
if self.d.ll(4): self.d.log("sending batch size to peer...")
self._node.putstr("dummy fn for batch size")
self._node.putnum(size)
self._node.putnum(0)
def pullpeer(self):
if not Parms.bindhost:
self.d.abend("local host addr for peering not specified", None)
try:
self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True)
except Node.AllPortsBusy as e:
self.d.abend("all predefined net ports are busy", None)
try:
self.d.log("accepting...", sev=4)
accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout)
self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4)
if not accepted: return
except Exception as e:
self._node.close_ssc()
self.d.abend("peer pull accept", None)
return
finally:
self._node.UDPsignalHUP() # stop UDP broadcast
self.d.log("get batch size from peer (dummy fn)", sev=4)
self._node.getfn() # dummy fn in batch size info
batchsize = self._node.getnum()
self.d.log("batchsize={}".format(batchsize), sev=4)
self._node.getnum() # dummy timestamp
counter = Counter(self.d, batchsize)
fn = self._node.getfn()
while fn: # receive batch of file/dir objects
self.d.log("fn={}".format(fn), sev=4)
fp = os.path.join(self._dest, fn)
size = self._node.getnum()
timestamp = self._node.getnum()
if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp))
if size < 0:
if not self._node.receive_dir(fp, size, timestamp):
self._node.close_sc()
else:
self._node.receive_file(fp, size, timestamp, counter=counter)
fn = self._node.getfn()
counter.stop()
self._node.close_sc()
self._node.close_ssc()
def close_sc(self):
self._node.close_sc()
def dirsize(self, fp):
p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE)
p.wait()
return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1
def batchSize(self):
size = 0
for fp in self._orig:
if os.path.exists(fp):
size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp)
return size
def srv_hack(self):
self._node = Node(self.d, conn=True)
for orig in self._orig:
self._node.putcmd("RECKON")
self._node.putstr(orig)
wholesize = self._node.getnum()
fnum = self._node.getnum()
dnum = self._node.getnum()
self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum))
def hack(self):
self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47')
self._node.acc(1)
def hack_recurse(self, realfp, pref):
self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref)))
if os.path.isdir(realfp):
for cwd, void, entries in os.walk(realfp, topdown=True):
for entry in entries:
self.hack_recurse2(os.path.join(cwd, entry), pref)
self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref)))
self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref)))
else:
self.d.log("file {} processing".format(realfp))