|
1 # coding=utf8 |
|
2 |
|
3 import socket, os, sys, time, signal, subprocess |
|
4 from d import D |
|
5 from parms import Parms |
|
6 from node import Node |
|
7 from counter import Counter |
|
8 |
|
9 # síťové operace klienta |
|
10 # - connect to remote server na base_port+CCx |
|
11 # - connect to remote peer na host:port, které dostane ze serveru na base_port+CC0 operací GETPEER |
|
12 # - listen for peer on base_port+CCx - konkrétní host:port zveřejňuje peer na serveru na base_port+CC0 operací SETPEER |
|
13 # - formalizace čísla portu je ovšem nutná jen když oba peers běží na tomtéž stroji |
|
14 |
|
15 class Client(): |
|
16 |
|
17 def __init__(self, d): |
|
18 |
|
19 self.d = D("client".format(d.debid)) |
|
20 |
|
21 Parms.clientMode = True |
|
22 |
|
23 self._orig = Parms.orig # origin data paths |
|
24 self._dest = Parms.dest # destination data path |
|
25 |
|
26 self._chan = Parms.sslchannel |
|
27 |
|
28 if self.d.ll(1): self.d.log("pgm={}, homedir={}, action={}, channel={}, debug={}" |
|
29 .format(sys.argv[0], Parms.client_homedir, Parms.action, self._chan, Parms.debugLevel)) |
|
30 if self.d.ll(3): self.d.log("CE path: {}, CA path: {}".format(Parms.sslCert, Parms.sslCAPath)) |
|
31 |
|
32 act = Parms.action |
|
33 if not act: |
|
34 return |
|
35 elif act == "PUSHCLIP": # from cliboard to remote server |
|
36 self.pushclip("clipboard") |
|
37 elif act == "PUSHPRIM": # from X primary to remote server |
|
38 self.pushclip("primary") |
|
39 elif act == "PUSHFILE": # from local files to remote server |
|
40 self.pushfile() |
|
41 elif act == "PUSHPEER": # from local files to remote peer |
|
42 self.pushpeer() |
|
43 elif act == 'PULLCLIP': # from remote server to cliboard |
|
44 self.pullclip() |
|
45 elif act == 'PULLHIST': # synchronize local clipboard history from server |
|
46 self.pullhist() |
|
47 elif act == 'PULLFILE': # from remote server files to local |
|
48 self.pullfile() |
|
49 elif act == 'PULLLIST': # filelist of server dir |
|
50 self.pulllist() |
|
51 elif act == "PULLPEER": # from remote peer to local |
|
52 self.pullpeer() |
|
53 elif act == "SRVHACK": |
|
54 self.srv_hack() |
|
55 elif act == "HACK": |
|
56 self.hack() |
|
57 else: |
|
58 self.d.log("ABEND, unknown command {}".format(act)) |
|
59 if hasattr(self, "_node"): |
|
60 self._node.close_sc() |
|
61 self._node.close_ssc() |
|
62 |
|
63 def pushclip(self, buffer): |
|
64 p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE) |
|
65 if not p.stdout.read().lstrip(): # nejdřív ověřit, že na clipboardu něco visí |
|
66 self.d.abend("{} buf is empty".format(buffer), None) |
|
67 self._node = Node(self.d, conn=True) # establish server cmd session |
|
68 self._node.putcmd("PUSHCLIP") |
|
69 if self.d.ll(3): self.d.log("push clipboard starting...") |
|
70 p = subprocess.Popen(["xclip", "-o", "-selection", buffer], stdout=subprocess.PIPE) |
|
71 self._node.put(p.stdout.read()) |
|
72 p.wait() |
|
73 if self.d.ll(2): self.d.log("push clipboard end") |
|
74 |
|
75 def longtask(self): |
|
76 # příkaz serveru k otevření paralelního portu pro dlouhý přenos |
|
77 # dostanu číslo portu, uvolním příkazový port a otevřu conn na nový port |
|
78 self._node.putcmd("LONGTASK") |
|
79 port = self._node.getnum() |
|
80 if port == 0: |
|
81 self.d.abend("all server net ports are busy", None) |
|
82 self._node.close() # free server base port 0 |
|
83 self._node = Node(self.d, port=port) |
|
84 |
|
85 def pushfile(self): |
|
86 if not self._orig: |
|
87 self.d.abendMsg("nothing specified") |
|
88 else: |
|
89 if not self._dest and len(self._orig) > 1: # je-li na vstupu více jmen, tak poslední je destdir |
|
90 self._dest = os.path.normpath(self._orig[-1]) |
|
91 if self._dest.startswith('/'): self._dest = self._dest[1:] # výstup jenom relativně |
|
92 del self._orig[-1] |
|
93 if not any(os.path.exists(p) for p in self._orig): |
|
94 self.d.abendMsg("{} not found".format(self._orig)) |
|
95 else: |
|
96 self._node = Node(self.d, conn=True) |
|
97 self.longtask() # přechod na datový port |
|
98 self._counter = Counter(self.d, self.batchSize()) # start transfer progress display |
|
99 self._node.putcmd("PUSHFILE") |
|
100 for fp in self._orig: |
|
101 if self.d.ll(3): self.d.log("pushfile '{}' --> '{}' starting...".format(fp, self._dest)) |
|
102 if os.path.exists(fp): |
|
103 normfp = os.path.normpath(fp) |
|
104 self.pushfile_recurse(normfp, os.path.dirname(normfp)) |
|
105 else: |
|
106 self.d.warn("pushfile: {} not found".format(fp)) |
|
107 self._counter.stop() |
|
108 self.d.log("pushfile end", sev=2) |
|
109 |
|
110 def pushfile_recurse(self, fp, prefix): |
|
111 if self.d.ll(4): self.d.log("fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix))) |
|
112 dest = os.path.join(self._dest, os.path.relpath(fp, prefix)) |
|
113 if os.path.isdir(fp): |
|
114 for cwd, void, entries in os.walk(fp, topdown=True): |
|
115 for entry in entries: |
|
116 if self.d.ll(4): self.d.log("cwd={}, entry={}".format(cwd, entry)) |
|
117 self.pushfile_recurse(os.path.join(cwd, entry), prefix) |
|
118 self._node.putfileinfo(cwd, os.path.join(self._dest, os.path.relpath(cwd, prefix))) |
|
119 self._node.putfileinfo(fp, dest) |
|
120 else: |
|
121 self._node.putfileinfo(fp, dest) |
|
122 with open(fp, mode='rb') as f: |
|
123 g = self._node.genput() |
|
124 g.send(None) |
|
125 data = f.read(Parms.bufSize) |
|
126 while data: |
|
127 self._counter.update(len(data)) |
|
128 if self.d.ll(5): self.d.log("push file: len read from file={}".format(len(data))) |
|
129 try: |
|
130 g.send(data) |
|
131 data = f.read(Parms.bufSize) |
|
132 except Exception: |
|
133 break |
|
134 g.close() |
|
135 |
|
136 def pullclip(self): |
|
137 self._node = Node(self.d, conn=True) # establish server cmd session |
|
138 self._node.putcmd("PULLCLIP") |
|
139 self._node.putstr("") # empty str means LAST entry in clipboard storage on server |
|
140 if self.d.ll(3): self.d.log("pull clipboard starting...") |
|
141 p = subprocess.Popen(["xclip", "-i", "-selection", "clipboard"], stdin=subprocess.PIPE) |
|
142 for data in self._node.genget(size = self._node.getnum()): |
|
143 p.stdin.write(data) |
|
144 p.stdin.close() |
|
145 p.wait() |
|
146 self._node.close_sc() |
|
147 if self.d.ll(2): self.d.log("pull clipboard end") |
|
148 |
|
149 def pullhist(self): |
|
150 """ |
|
151 synchronizace historie clipboardu se serverem |
|
152 - entries z clipboardu se drží na serveru v jednotlivých souborech, které se synchronizují do lokálního dir |
|
153 - po synchrnizaci se vytvoří indexový soubor seřazený podle timestampů |
|
154 """ |
|
155 histdir = Parms.client_histdir |
|
156 self._node = Node(self.d, conn=True) # establish server cmd session |
|
157 self._node.putcmd("PULLHIST") |
|
158 self.d.log("synchronizing clipboard history...") |
|
159 os.makedirs(histdir, mode=0o755, exist_ok=True) |
|
160 os.chdir(histdir) |
|
161 entries = dict() |
|
162 toget = dict() |
|
163 fn = self._node.getfn() |
|
164 while len(fn) > 0: # inventarizace serveru |
|
165 if self.d.ll(5): self.d.log("fn=" + fn) |
|
166 # entries obsahují prvních 80 bytů z clipboard entry, délku clipboard entry a timestamp |
|
167 # entries mohou být binární i textové, takže se nedekódují |
|
168 entries[fn] = (self._node.getstr(decode=False), self._node.getnum(), self._node.getnum()) |
|
169 if not os.path.exists(fn): |
|
170 toget[fn] = entries[fn] |
|
171 if self.d.ll(4): self.d.log("fn {} doesn't exists, toget[fn]={}, toget size={}".format(fn, toget[fn], len(toget))) |
|
172 fn = self._node.getfn() |
|
173 self.d.log("toget={}, toget size={}".format(toget.keys(), len(toget)), sev=4) |
|
174 if len(toget): # synchronizace |
|
175 for fn in toget.keys(): |
|
176 self._node.putcmd("PULLCLIP") |
|
177 self._node.putstr(fn) # send requested entry name |
|
178 with open(fn, mode='wb') as f: |
|
179 for data in self._node.genget(size = self._node.getnum()): |
|
180 f.write(data) |
|
181 timestamp = toget[fn][2] |
|
182 os.utime(fn, (timestamp, timestamp)) |
|
183 self._node.close_sc() |
|
184 p = subprocess.Popen(["sort", "-k2", "-r"], stdin=subprocess.PIPE, stdout=open('.index', mode='w'), universal_newlines=True) |
|
185 for fn in os.listdir(): # indexing |
|
186 if fn == ".index": continue |
|
187 digest = open(fn, mode="rb").read(80).replace(b'\n', b' ') |
|
188 try: digest = digest.decode() # to, co nepůjde dekódovat, necháme být |
|
189 except UnicodeDecodeError: pass |
|
190 size = os.path.getsize(fn) |
|
191 timestamp = int(os.path.getmtime(fn)) |
|
192 p.stdin.write("{} {} {: 6d} {}\n".format(fn, time.strftime("%Y/%m/%d.%H:%M:%S", time.gmtime(timestamp)), size, digest)) |
|
193 p.stdin.close() |
|
194 p.wait() |
|
195 if p.returncode == 0: |
|
196 p = subprocess.Popen(["mc", histdir]) |
|
197 p.wait() |
|
198 if self.d.ll(5): self.d.log("clipboard history sync finished") |
|
199 |
|
200 |
|
201 def pullfile(self): |
|
202 ldp = len(Parms.datapaths) |
|
203 if ldp: |
|
204 if ldp > 1: # alespoň 2 argumenty: poslední arg je destination dir, ostatní args jsou požadavky |
|
205 self._orig = Parms.datapaths[:ldp-1] |
|
206 self._dest = Parms.datapaths[-1] |
|
207 else: # jedinný arg je požadavek, destination dir podle env DEST |
|
208 self._orig = Parms.datapaths[:1] |
|
209 self._dest = Parms.dest |
|
210 else: # bez argumentů: požadavek i destinace podle env |
|
211 self._orig = Parms.orig |
|
212 self._dest = Parms.dest |
|
213 if not self._orig: |
|
214 self.d.abend("no filename specified, ABEND") |
|
215 else: |
|
216 self._node = Node(self.d, conn=True) |
|
217 size, fnum, dnum = (0, 0, 0) |
|
218 for orig in self._orig: # zjistíme celkovou velikost dávky pro průběžné sledování |
|
219 orig = os.path.normpath(orig) |
|
220 self._node.putcmd("RECKON") |
|
221 self._node.putstr(orig) |
|
222 size += self._node.getnum() |
|
223 fnum += self._node.getnum() |
|
224 dnum += self._node.getnum() |
|
225 self.longtask() |
|
226 self.d.log("pulling {} bytes in {} files and {} dirs...".format(size, fnum, dnum), sev=3) |
|
227 self._counter = Counter(self.d, size) |
|
228 for orig in self._orig: # vlastní download dávky |
|
229 orig = os.path.normpath(orig) |
|
230 self._node.putcmd("PULLFILE") |
|
231 if self.d.ll(4): self.d.log("pull of '{}' starting...".format(orig)) |
|
232 self._node.putstr(orig) |
|
233 fn = self._node.getfn() |
|
234 while len(fn) > 0: |
|
235 fp = os.path.join(self._dest, fn) |
|
236 size = self._node.getnum() |
|
237 timestamp = self._node.getnum() |
|
238 if self.d.ll(4): self.d.log("pull to '{}, dir={}'...".format(fp, size == -1)) |
|
239 if size < 0: |
|
240 self._node.receive_dir(fp, size, timestamp) |
|
241 else: |
|
242 self._node.receive_file(fp, size, timestamp, counter=self._counter) |
|
243 fn = self._node.getfn() |
|
244 self._counter.stop() |
|
245 if self.d.ll(2): self.d.log("pull file end") |
|
246 |
|
247 def pulllist(self): |
|
248 """ |
|
249 po odeslání příkazu se načítají údaje o souborech ve tvaru |
|
250 <délka_fn><fn><file_size><file_timestamp> |
|
251 """ |
|
252 self._node = Node(self.d, conn=True) |
|
253 self._node.putcmd("PULLLIST") |
|
254 self._node.putstr(os.path.normpath(self._orig[0] if self._orig else ".")) |
|
255 fn = self._node.getfn() |
|
256 while fn: |
|
257 size = self._node.getnum() |
|
258 timestamp = time.asctime(time.localtime(self._node.getnum())) |
|
259 print("{: 12d} {:24} {}".format(size, timestamp, fn)) |
|
260 fn = self._node.getfn() |
|
261 if self.d.ll(5): self.d.log("fn='{}'".format(str(fn))) |
|
262 |
|
263 def pushpeer(self): |
|
264 if not self._orig: |
|
265 self.d.abendMsg("nothing specified") |
|
266 elif not any(os.path.exists(p) for p in self._orig): |
|
267 self.d.abendMsg("{} not found".format(self._orig)) |
|
268 else: |
|
269 self._node = Node(self.d, conn=True, peering=True) |
|
270 size = self.batchSize() |
|
271 self.sendBatchSize(size) # poskytneme partnerovi údaje o velikosti odesílané dávky |
|
272 self._counter = Counter(self.d, size) |
|
273 try: |
|
274 for fp in self._orig: |
|
275 if self.d.ll(3): self.d.log("pushpeer: from={}".format(fp)) |
|
276 if os.path.exists(fp): |
|
277 normfp = os.path.normpath(fp) |
|
278 self.pushpeer_recurse(normfp, os.path.dirname(normfp)) |
|
279 else: |
|
280 self.d.warn("'{}' not found".format(fp)) |
|
281 finally: |
|
282 self._node.sendEOD() # end of batch |
|
283 self._counter.stop() |
|
284 |
|
285 def pushpeer_recurse(self, fp, prefix): |
|
286 if self.d.ll(4): self.d.log("pushpeer recurse: fp={}, relative fp={}".format(fp, os.path.relpath(fp, prefix))) |
|
287 if os.path.isdir(fp): |
|
288 for cwd, void, entries in os.walk(fp, topdown=True): |
|
289 for entry in entries: |
|
290 self.pushpeer_recurse(os.path.join(cwd, entry), prefix) |
|
291 self._node.putfileinfo(cwd, os.path.relpath(cwd, prefix)) |
|
292 self._node.putfileinfo(fp, os.path.relpath(fp, prefix)) |
|
293 else: |
|
294 self._node.putfileinfo(fp, os.path.relpath(fp, prefix)) |
|
295 with open(fp, mode='rb') as f: |
|
296 g = self._node.genput() |
|
297 g.send(None) |
|
298 data = f.read(Parms.bufSize) |
|
299 while data: |
|
300 self._counter.update(len(data)) |
|
301 try: |
|
302 g.send(data) |
|
303 data = f.read(Parms.bufSize) |
|
304 except Exception: |
|
305 break |
|
306 g.close() |
|
307 if self.d.ll(4): self.d.log("PUSHPEER: entry {} sent".format(fp)) |
|
308 |
|
309 def sendBatchSize(self, size): |
|
310 """ |
|
311 odeslání informace o velikosti připravené dávky dat |
|
312 informace se odešle formou informace o souboru (filename, filesize, timestamp) |
|
313 """ |
|
314 if self.d.ll(4): self.d.log("sending batch size to peer...") |
|
315 self._node.putstr("dummy fn for batch size") |
|
316 self._node.putnum(size) |
|
317 self._node.putnum(0) |
|
318 |
|
319 def pullpeer(self): |
|
320 if not Parms.bindhost: |
|
321 self.d.abend("local host addr for peering not specified", None) |
|
322 try: |
|
323 self._node = Node(self.d, host=Parms.bindhost, tryPort=True, conn=False, peering=True) |
|
324 except Node.AllPortsBusy as e: |
|
325 self.d.abend("all predefined net ports are busy", None) |
|
326 try: |
|
327 self.d.log("accepting...", sev=4) |
|
328 accepted = self._node.acc(acc_TO=Parms.peer_accept_timeout) |
|
329 self.d.log("peer {}accepted".format("not " if not accepted else ""), sev=4) |
|
330 if not accepted: return |
|
331 except Exception as e: |
|
332 self._node.close_ssc() |
|
333 self.d.abend("peer pull accept", None) |
|
334 return |
|
335 finally: |
|
336 self._node.UDPsignalHUP() # stop UDP broadcast |
|
337 self.d.log("get batch size from peer (dummy fn)", sev=4) |
|
338 self._node.getfn() # dummy fn in batch size info |
|
339 batchsize = self._node.getnum() |
|
340 self.d.log("batchsize={}".format(batchsize), sev=4) |
|
341 self._node.getnum() # dummy timestamp |
|
342 counter = Counter(self.d, batchsize) |
|
343 fn = self._node.getfn() |
|
344 while fn: # receive batch of file/dir objects |
|
345 self.d.log("fn={}".format(fn), sev=4) |
|
346 fp = os.path.join(self._dest, fn) |
|
347 size = self._node.getnum() |
|
348 timestamp = self._node.getnum() |
|
349 if self.d.ll(4): self.d.log("pulling from peer to '{}'...".format(fp)) |
|
350 if size < 0: |
|
351 if not self._node.receive_dir(fp, size, timestamp): |
|
352 self._node.close_sc() |
|
353 else: |
|
354 self._node.receive_file(fp, size, timestamp, counter=counter) |
|
355 fn = self._node.getfn() |
|
356 counter.stop() |
|
357 self._node.close_sc() |
|
358 self._node.close_ssc() |
|
359 |
|
360 def close_sc(self): |
|
361 self._node.close_sc() |
|
362 |
|
363 def dirsize(self, fp): |
|
364 p = subprocess.Popen(("du", "-sb", fp), stdout=subprocess.PIPE) |
|
365 p.wait() |
|
366 return int(p.stdout.readlines()[0].decode().split("\t")[0]) if p.returncode == 0 else -1 |
|
367 |
|
368 def batchSize(self): |
|
369 size = 0 |
|
370 for fp in self._orig: |
|
371 if os.path.exists(fp): |
|
372 size += self.dirsize(fp) if os.path.isdir(fp) else os.path.getsize(fp) |
|
373 return size |
|
374 |
|
375 def srv_hack(self): |
|
376 self._node = Node(self.d, conn=True) |
|
377 for orig in self._orig: |
|
378 self._node.putcmd("RECKON") |
|
379 self._node.putstr(orig) |
|
380 wholesize = self._node.getnum() |
|
381 fnum = self._node.getnum() |
|
382 dnum = self._node.getnum() |
|
383 self.d.log("reckon: {}, {}, {}".format(wholesize, fnum, dnum)) |
|
384 |
|
385 def hack(self): |
|
386 self._node = Node(self.d, conn=False, tryPort=False, port=1111, host='10.0.1.47') |
|
387 self._node.acc(1) |
|
388 |
|
389 def hack_recurse(self, realfp, pref): |
|
390 self.d.log("hack realfp={}, relfp={}".format(realfp, os.path.relpath(realfp, pref))) |
|
391 if os.path.isdir(realfp): |
|
392 for cwd, void, entries in os.walk(realfp, topdown=True): |
|
393 for entry in entries: |
|
394 self.hack_recurse2(os.path.join(cwd, entry), pref) |
|
395 self.d.log("node.putfileinfo({}, {})".format(cwd, os.path.relpath(cwd, pref))) |
|
396 self.d.log("node.putfileinfo({}, {})".format(realfp, os.path.relpath(realfp, pref))) |
|
397 else: |
|
398 self.d.log("file {} processing".format(realfp)) |