|
1 #!/usr/bin/python3 |
|
2 # coding=utf8 |
|
3 |
|
4 # ring nonssl: cca 8000 rings/sec*3nodes |
|
5 # ring ssl: cca 2600 rings/sec*3nodes |
|
6 # mash nonssl: cca 2000 mashes/sec*3nodes |
|
7 # mash ssl: cca 300 mashes/sec*3nodes |
|
8 # u mashe je čas na přenosy úměrný počtu uzlů, |
|
9 # kdežto čas na connect/close je úměrný počtu spojů tj. kvadrátu počtu uzlů |
|
10 |
|
11 import time |
|
12 import socket |
|
13 import errno |
|
14 import os |
|
15 import signal |
|
16 import sys |
|
17 import pickle |
|
18 import ssl |
|
19 import select |
|
20 import random |
|
21 import multiprocessing |
|
22 import threading |
|
23 |
|
24 |
|
25 class Debug(): |
|
26 def __init__(self, debid): |
|
27 self.debid = debid |
|
28 def log(self, level, *msg): |
|
29 if level <= maxDebLev: |
|
30 log_lock.acquire() |
|
31 print("{:10.6f} {}:".format(time.time()-t0, self.debid), *msg, file=sys.stderr) |
|
32 sys.stderr.flush() |
|
33 log_lock.release() |
|
34 def abend(self, s): |
|
35 self.log(0, s) |
|
36 os.kill(0, signal.SIGTERM) |
|
37 |
|
38 |
|
39 class Node(Debug): |
|
40 def __init__(self, debid, forwarding, topo, port, p0, pn, issl): |
|
41 Debug.__init__(self, "{} node {}".format(debid, port)) |
|
42 self.topo = topo |
|
43 self.locPort = port |
|
44 self.p0 = p0 |
|
45 self.pn = pn |
|
46 self.ssc = None |
|
47 self.cli_side = {} |
|
48 self.srv_side = {} |
|
49 self.kicker = (self.locPort == self.pn) |
|
50 self.forwarding = forwarding # in forwarding kicker task indicates when TTL reached 0 |
|
51 self.closing = False |
|
52 self.payload = None |
|
53 self.issl = issl |
|
54 if issl: |
|
55 self.log(4, "setting SSL context...") |
|
56 self.sslCert = cePath + "/certs/{}.pem".format(port) |
|
57 self.sslKey = cePath + "/keys/{}.key".format(port) |
|
58 try: |
|
59 self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) |
|
60 self.ctx.verify_mode = ssl.CERT_REQUIRED |
|
61 self.log(5, "cert={}, key={}".format(self.sslCert, self.sslKey)) |
|
62 self.ctx.load_cert_chain(self.sslCert, self.sslKey) |
|
63 self.ctx.load_verify_locations(None, caPath) |
|
64 except ssl.SSLError as e: self.abend("SSL context: {}".format(str(e))) |
|
65 def run(self): |
|
66 self.bind() |
|
67 self.payload = Data(self.debid, self.locPort) |
|
68 if self.kicker: |
|
69 nxt = self.next_node() |
|
70 self.log(1, "kicker ready to send '{}' to {}".format(self.payload.digest(), nxt)) |
|
71 if not nxt in self.cli_side: self.conn(nxt) |
|
72 self.payload.put(self.cli_side[nxt]) |
|
73 if self.forwarding[0].value: wait_list = (self.ssc,) |
|
74 while wait_list: |
|
75 self.log(4, "select...") |
|
76 self.log(5, "waitlist:", *(sc.fileno() for sc in wait_list)) |
|
77 ready_list = select.select(wait_list, (), (), sel_TO) |
|
78 self.log(5, "readylist:", *(sc.fileno() for sc in ready_list[0])) |
|
79 for sc in ready_list[0]: |
|
80 self.log(5, "sc {} ready...".format(sc.fileno())) |
|
81 if sc == self.ssc: sc = self.acc() |
|
82 self.forward(sc) |
|
83 wait_list = () |
|
84 self.log(4, "forwarding={}".format(self.forwarding[0].value)) |
|
85 if self.forwarding[0].value: wait_list += (self.ssc,) # when off, no new connection will come on ssc |
|
86 else: self.close_cli() |
|
87 for sc in self.srv_side.values(): wait_list += (sc,) |
|
88 self.close_srv() |
|
89 signal.signal(signal.SIGUSR2, sighand) |
|
90 last = 0 |
|
91 ctr_lock.acquire() |
|
92 active.value -= 1 |
|
93 if active.value == 0: last = 1 |
|
94 ctr_lock.release() |
|
95 if last: os.kill(0, signal.SIGUSR2) |
|
96 else: signal.pause() |
|
97 self.log(2, "ended") |
|
98 os._exit(0) |
|
99 def forward(self, sc): |
|
100 if not self.payload.get(sc): |
|
101 self.log(5, "delete srv_side[{}]".format(sc.fileno())) |
|
102 del self.srv_side[sc] |
|
103 self.log(4, "closing {}...".format(sc.fileno())) |
|
104 sc.close() |
|
105 else: |
|
106 self.log(5, "received data from {}".format(self.payload.rport)) |
|
107 ctr_lock.acquire(); |
|
108 forwards.value += 1; |
|
109 ctr_lock.release() |
|
110 if self.kicker: |
|
111 self.payload.dttl() |
|
112 self.log(3, "received from {}: {}, ttl={}".format(self.topo, self.payload.digest(), self.payload.ttl)) |
|
113 if self.payload.ttl <= 0: |
|
114 self.log(1, "received after passing all {}: {}".\ |
|
115 format("mashes" if self.topo == "mash" else "rings", self.payload.digest())) |
|
116 self.forwarding[0].value = 0 |
|
117 return |
|
118 nxt = self.next_node() |
|
119 self.log(5, "forwarding to {}...".format(nxt)) |
|
120 if pacing: time.sleep(pace) |
|
121 if not nxt in self.cli_side: self.conn(nxt) |
|
122 self.payload.put(self.cli_side[nxt]) |
|
123 self.log(5, "forwarded to {}".format(nxt)) |
|
124 def next_node(self): |
|
125 if self.topo == "ring": |
|
126 if self.kicker: nxt = self.p0 |
|
127 else: nxt = self.locPort + 1 |
|
128 else: |
|
129 nxt = self.locPort |
|
130 while nxt == self.locPort: |
|
131 nxt = random.randint(self.p0, self.pn) |
|
132 return nxt |
|
133 def bind(self): |
|
134 self.log(4, "binding...") |
|
135 try: |
|
136 self.ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
137 self.ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
|
138 except Exception as e: |
|
139 self.abend("ssc alloc: {}".format(e.strerror)) |
|
140 if self.issl: |
|
141 self.log(4, "ssc SSL wrap") |
|
142 try: self.ssc = self.ctx.wrap_socket(self.ssc) |
|
143 except ssl.SSLError as e: self.abend("ssc SSL wrap: {}".format(str(e))) |
|
144 try: |
|
145 self.ssc.bind(("127.0.0.1", self.locPort)) |
|
146 self.ssc.listen(1) |
|
147 except Exception as e: self.abend("bind: {}".format(e.strerror)) |
|
148 self.log(2, "bound") |
|
149 def conn(self, remPort): |
|
150 self.log(4, "connecting to {}...".format(remPort)) |
|
151 try: sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
152 except Exception as e: self.abend("socket alloc: {}".format(e.strerror)) |
|
153 if self.issl: |
|
154 self.log(4, "sc SSL wrap") |
|
155 try: sc = self.ctx.wrap_socket(sc) |
|
156 except ssl.SSLError as e: self.abend("sc SSL wrap: {}".format(str(e))) |
|
157 retry = connThreshold |
|
158 connected = False |
|
159 while not connected and retry > 0: |
|
160 try: |
|
161 sc.connect(("127.0.0.1", remPort)) |
|
162 connected = True |
|
163 except Exception as e: |
|
164 if e.errno == errno.ECONNREFUSED: |
|
165 retry = retry - 1 |
|
166 time.sleep(conn_TO) |
|
167 else: self.abend("connect: {}".format(str(e))) |
|
168 if retry == 0: self.abend("connection refused, threshold {} reached".format(connThreshold)) |
|
169 ctr_lock.acquire(); |
|
170 connects.value += 1; |
|
171 ctr_lock.release() |
|
172 try: self.cli_side[remPort] = sc.makefile("wb") |
|
173 except Exception as e: self.abend("client side makefile: {}".format(str(e))) |
|
174 self.log(2, "connected to {} after {} retries".format(remPort, connThreshold - retry)) |
|
175 def acc(self): |
|
176 self.log(4, "accepting...") |
|
177 try: |
|
178 ac = self.ssc.accept() |
|
179 sc = ac[0] |
|
180 except Exception as e: self.abend("accept: {}".format(str(e))) |
|
181 try: sc = sc.makefile("rb") |
|
182 except Exception as e: self.abend("srv side makefile: {}".format(str(e))) |
|
183 self.srv_side[sc] = sc |
|
184 self.log(2, "accepted on sc={}, addr={}".format(sc.fileno(), ac[1])) |
|
185 return sc |
|
186 def close_srv(self): |
|
187 self.log(5, "closing ssc...") |
|
188 if self.ssc: self.ssc.close() |
|
189 def close_cli(self): |
|
190 def do_close(): |
|
191 self.log(5, "closing clients...") |
|
192 scs = self.cli_side.values() |
|
193 self.cli_side.clear() |
|
194 for sc in scs: sc.close() |
|
195 self.log(4, "all clients closed") |
|
196 if not self.closing: |
|
197 threading.Thread(target = do_close, name = "client {} close".format(self.locPort)).start() |
|
198 self.closing = True |
|
199 |
|
200 |
|
201 class Data(Debug): |
|
202 def __init__(self, debid, port): |
|
203 self.debid = debid + " payload" |
|
204 self.clear() |
|
205 self.ttl = ittl |
|
206 self.lport = port |
|
207 self.rport = 0 |
|
208 self.text = "" |
|
209 def clear(self): |
|
210 (self.ttl, self.rport, self.text) = 3 * (None,) |
|
211 def put(self, sc): |
|
212 self.log(5, "sending via {}...".format(sc.fileno())) |
|
213 if self.ttl == None: self.ttl = ittl |
|
214 if self.text == None: self.text = itext |
|
215 try: |
|
216 pickle.dump((self.ttl, self.lport, self.text), sc) |
|
217 sc.flush() |
|
218 except Exception as e: self.abend("send: {}".format(str(e))) |
|
219 def get(self, sc): |
|
220 self.log(5, "reading from {}...".format(sc.fileno())) |
|
221 self.clear() |
|
222 try: |
|
223 (self.ttl, self.rport, self.text) = pickle.load(sc) |
|
224 return True |
|
225 except Exception as e: |
|
226 if isinstance(e, EOFError): return False |
|
227 else: self.abend("receive: {}".format(str(e))) |
|
228 def dttl(self): |
|
229 self.ttl -= 1 |
|
230 return self.ttl |
|
231 def digest(self): |
|
232 return self.text if len(self.text) < 24 else self.text[0:8]+"--------"+self.text[-8:] |
|
233 def toString(self): |
|
234 return "ttl={}, from port={}, text={}".\ |
|
235 format(str(self.ttl), str(self.rport), self.digest()) |
|
236 |
|
237 class Constellation(Debug): |
|
238 def __init__(self, issl, topo, p0, n): |
|
239 Debug.__init__(self, "{}SSL {}".format("" if issl else "non", topo.upper())) |
|
240 def run(self, issl, topo, p0, n): |
|
241 signal.signal(signal.SIGUSR2, signal.SIG_IGN) |
|
242 forwarding = [multiprocessing.Value('i', 1, lock=False)] # list is passed by reference |
|
243 if n == 1: |
|
244 self.log(0, "one-node configuration is not implemented") |
|
245 else: |
|
246 self.log(1, "{} nodes starting...".format(n)) |
|
247 p0 += 500 if issl else 0 |
|
248 pn = p0 + n - 1 |
|
249 for port in range(p0, p0 + n): |
|
250 pid = os.fork() |
|
251 if not pid: Node(self.debid, forwarding, topo, port, p0, pn, issl).run() |
|
252 else: self.log(4, "node {} started in process {}".format(port, pid)) |
|
253 self.log(2, "all nodes established") |
|
254 while 1: |
|
255 try: |
|
256 p = os.wait() |
|
257 if p[1] & 255: |
|
258 self.log(4, "pid {} killed by {}".format(p[0], p[1] & 255)) |
|
259 else: |
|
260 self.log(4, "pid {} returned {}".format(p[0], p[1] >> 8)) |
|
261 except: break |
|
262 os._exit(0) |
|
263 |
|
264 def ga(key, default): |
|
265 return os.environ[key] if key in os.environ else default |
|
266 def gi(key, default): |
|
267 return int(ga(key, default)) |
|
268 def sighand(signal, frame): |
|
269 pass |
|
270 |
|
271 debug = Debug("client/server demo") |
|
272 log_lock = multiprocessing.Lock() |
|
273 ctr_lock = multiprocessing.Lock() |
|
274 forwards = multiprocessing.Value('i', 0) |
|
275 connects = multiprocessing.Value('i', 0) |
|
276 active = multiprocessing.Value('i', 0) |
|
277 t0 = time.time() |
|
278 maxDebLev = gi('DEB', 0) |
|
279 mn = rn = gi('N', 0) |
|
280 rp0 = gi('RP0', 11000) |
|
281 rn = gi('RN', 3) |
|
282 mp0 = gi('MP0', 12000) |
|
283 mn = gi('MN', 3) |
|
284 itext = ga('T', "bla bla") |
|
285 ittl = gi('TTL', 3) |
|
286 pace = float(os.environ["P"]) if "P" in os.environ else 0 |
|
287 pacing = 1 if pace > 0 else 0 |
|
288 random.seed(gi('RS', 0)) |
|
289 connThreshold = 77 |
|
290 conn_TO = 0.01 |
|
291 sel_TO = 1 |
|
292 issl = gi('SSL', 0) |
|
293 caPath = "/home/local/etc/ssl/certs/" |
|
294 sslPathSuff = "/../CS/" |
|
295 cePath = os.environ["CEP"] if "CEP" in os.environ else os.path.dirname(sys.argv[0]) + sslPathSuff |
|
296 active.value = mn + rn |
|
297 if issl > 1: active.value *= 2 |
|
298 signal.signal(signal.SIGUSR2, signal.SIG_IGN) |
|
299 debug.log(1, "pgm={}, ttl={}, pace={}, seed={}, SSL mask={}, debug={}".format(sys.argv[0], ittl, pace, gi('RS', 0), issl, maxDebLev)) |
|
300 if issl > 0: debug.log(3, "ssl path: {}, CA path: {}".format(cePath, caPath)) |
|
301 |
|
302 if issl < 2: issl = (issl,) |
|
303 else: issl = (0, 1) |
|
304 if ittl > 0: |
|
305 for ss in issl: |
|
306 topo = ("mash", "ring") |
|
307 p0 = (mp0, rp0) |
|
308 n = (mn, rn) |
|
309 for p in zip(topo, p0, n): |
|
310 if not os.fork(): |
|
311 Constellation(ss, *p).run(ss, *p) |
|
312 while 1: |
|
313 try: os.wait() |
|
314 except: break |
|
315 debug.log(1, "final balance: forwards={}, connections={}".format(forwards.value, connects.value)) |