|
0
|
1 |
#include "CS.h"
|
|
|
2 |
|
|
|
3 |
CSP csP;
|
|
|
4 |
|
|
|
5 |
void sighandle(int sig) { return; }
|
|
|
6 |
void abend(DebugP deP) {
|
|
|
7 |
int pid;
|
|
|
8 |
//LOG(0, "ABORT, netstat:");
|
|
|
9 |
//if(!(pid = fork())) {
|
|
|
10 |
// execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
|
|
|
11 |
// exit(EXIT_SUCCESS);
|
|
|
12 |
//}
|
|
|
13 |
waitpid(pid, NULL, 0);
|
|
|
14 |
LOG(0, "ABORT, backtrace:");
|
|
|
15 |
deP->back_trace();
|
|
|
16 |
kill(0, SIGTERM);
|
|
|
17 |
exit(EXIT_FAILURE);
|
|
|
18 |
}
|
|
|
19 |
char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket
|
|
|
20 |
char *s = (char*) malloc(64);
|
|
|
21 |
unsigned short port = *(unsigned short*) ai_addr->sa_data;
|
|
|
22 |
char *a = ai_addr->sa_data + 2;
|
|
|
23 |
sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
|
|
|
24 |
return s;
|
|
|
25 |
}
|
|
|
26 |
void gai(int level, struct addrinfo *ai, DebugP deP) { // logs assigned IP4 addresses from addrinfo chain
|
|
|
27 |
struct addrinfo *sa = ai;
|
|
|
28 |
if (deP->debug >= level) do {
|
|
|
29 |
char *s = gpa(sa->ai_addr);
|
|
|
30 |
strcpy(deP->s, s); free(s);
|
|
|
31 |
deP->deb(level);
|
|
|
32 |
} while ((sa = sa->ai_next));
|
|
|
33 |
fflush(stderr);
|
|
|
34 |
}
|
|
|
35 |
void ssl_err(char *s, DebugP deP) {
|
|
|
36 |
long e = ERR_get_error();
|
|
|
37 |
while(e) {
|
|
|
38 |
LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
|
|
|
39 |
e = ERR_get_error();
|
|
|
40 |
}
|
|
|
41 |
}
|
|
|
42 |
static int getArg(const char *a) {
|
|
|
43 |
return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; }
|
|
|
44 |
HeaderS::HeaderS() {
|
|
|
45 |
this->ttl = 0; }
|
|
|
46 |
HeaderS::HeaderS(int ttl) {
|
|
|
47 |
this->ttl = ttl; }
|
|
|
48 |
int HeaderS::len() {
|
|
|
49 |
return sizeof(HeaderS); }
|
|
|
50 |
PayloadS::PayloadS() {
|
|
|
51 |
strcpy(&text, "EMPTY"); }
|
|
|
52 |
PayloadS::PayloadS(const char *text) {
|
|
|
53 |
ts = time(NULL);
|
|
|
54 |
strcpy(&(this->text), text); }
|
|
|
55 |
int PayloadS::check(PayloadP p) {
|
|
|
56 |
return (int)(strcmp(&text, &p->text) == 0); }
|
|
|
57 |
char *PayloadS::deliver() {
|
|
|
58 |
return &text; }
|
|
|
59 |
string PayloadS::digest() {
|
|
|
60 |
string payl_s = string(&text);
|
|
|
61 |
if(payl_s.size() < 24) return payl_s;
|
|
|
62 |
else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); }
|
|
|
63 |
void PayloadS::sabotage() {
|
|
|
64 |
text = '?'; }
|
|
|
65 |
int PayloadS::len() {
|
|
|
66 |
return sizeof(PayloadS) + strlen(&text); }
|
|
|
67 |
int ContainerS::len() {
|
|
|
68 |
return hdr.len() + payl.len(); }
|
|
|
69 |
DataC::DataC() {}
|
|
|
70 |
DataC::DataC(DebugP callerDeP) {
|
|
|
71 |
deP = new DebugC();
|
|
|
72 |
DEBID("%s DATA", callerDeP->debid);
|
|
|
73 |
dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size();
|
|
|
74 |
contP = static_cast<ContainerP>(operator new(dataLen));
|
|
|
75 |
new(&contP->hdr) HeaderS();
|
|
|
76 |
new(&contP->payl) PayloadS();
|
|
|
77 |
LOG(5, "Empty Data instance established");
|
|
|
78 |
}
|
|
|
79 |
void DataC::load(int ttl, const char *text) {
|
|
|
80 |
new(&contP->hdr) HeaderS(ttl);
|
|
|
81 |
new(&contP->payl) PayloadS(text);
|
|
|
82 |
LOG(5, "Data instance loaded with payload");
|
|
|
83 |
return;
|
|
|
84 |
}
|
|
|
85 |
string DataC::unld() {
|
|
|
86 |
return string(&(contP->payl.text)); }
|
|
|
87 |
string DataC::digest() {
|
|
|
88 |
return contP->payl.digest(); }
|
|
|
89 |
int DataC::dttl() {
|
|
|
90 |
return --contP->hdr.ttl; }
|
|
|
91 |
int DataC::ttl() {
|
|
|
92 |
return contP->hdr.ttl; }
|
|
|
93 |
bool DataC::dataOk() {
|
|
|
94 |
return csP->text == string(&(contP->payl.text)); }
|
|
|
95 |
int DataC::ts() {
|
|
|
96 |
return contP->hdr.ts; }
|
|
|
97 |
int DataC::remPort(int remPort) {
|
|
|
98 |
return (contP->hdr.remPort = remPort); }
|
|
|
99 |
int DataC::remPort() {
|
|
|
100 |
return contP->hdr.remPort; }
|
|
|
101 |
NodeC::NodeC(ConstellationP co, int port) {
|
|
|
102 |
deP = new DebugC();
|
|
|
103 |
DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port);
|
|
|
104 |
LOG(4, "intializing ...");
|
|
|
105 |
data = DataC(deP);
|
|
|
106 |
topo = co->topo;
|
|
|
107 |
locPort = port;
|
|
|
108 |
first = co->first;
|
|
|
109 |
nodes = co->nodes;
|
|
|
110 |
last = first + nodes - 1;
|
|
|
111 |
cliSides = new SocketS[nodes];
|
|
|
112 |
srvSides = new SocketS[nodes];
|
|
|
113 |
for(int k=0; k<nodes; k++) cliSides[k].sc = srvSides[k].sc = -1;
|
|
|
114 |
kicker = (locPort == first);
|
|
|
115 |
forwP = co->forwP;
|
|
|
116 |
closing = 0;
|
|
|
117 |
ssl = co->ssl;
|
|
|
118 |
ssc = 0;
|
|
|
119 |
if(ssl) {
|
|
|
120 |
char s[128];
|
|
|
121 |
SSL_load_error_strings();
|
|
|
122 |
SSL_library_init();
|
|
|
123 |
LOG(4, "setting SSL contex...");
|
|
|
124 |
if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX");
|
|
|
125 |
SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY);
|
|
|
126 |
SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); // SSL will claim partner certificate
|
|
|
127 |
sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort);
|
|
|
128 |
LOG(5, "SSL private key used: %s", s);
|
|
|
129 |
if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
|
|
|
130 |
sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort);
|
|
|
131 |
LOG(5, "SSL certificate used: %s", s);
|
|
|
132 |
if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
|
|
|
133 |
LOG(5, "SSL: CApath: %s", csP->caPath.c_str());
|
|
|
134 |
if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path");
|
|
|
135 |
}
|
|
|
136 |
LOG(5, "initalized");
|
|
|
137 |
}
|
|
|
138 |
int NodeC::run() {
|
|
|
139 |
LOG(5, "binding, kicker=%d", kicker);
|
|
|
140 |
bindN();
|
|
|
141 |
if(kicker) {
|
|
|
142 |
data.load(csP->ttl, csP->text.c_str());
|
|
|
143 |
int sci, next;
|
|
|
144 |
next = next_node(); sci = next - first;
|
|
|
145 |
LOG(2, "ready to initial send %s, len=%d to node %d", data.digest().c_str(), data.dataLen, next);
|
|
|
146 |
conn(sci, next);
|
|
|
147 |
putN(sci);
|
|
|
148 |
}
|
|
|
149 |
mainLoop();
|
|
|
150 |
LOG(5, "closing ssc");
|
|
|
151 |
close(ssc);
|
|
|
152 |
if(closing) closingThread.join(); // wait for closing thread
|
|
|
153 |
struct sigaction sigact;
|
|
|
154 |
sigfillset(&sigact.sa_mask);
|
|
|
155 |
sigact.sa_handler = sighandle;
|
|
|
156 |
if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
|
|
|
157 |
if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
|
|
|
158 |
if(--csP->shP->act == 0) {
|
|
|
159 |
if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
|
|
|
160 |
kill(0, SIGUSR2);
|
|
|
161 |
}
|
|
|
162 |
else {
|
|
|
163 |
if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
|
|
|
164 |
pause();
|
|
|
165 |
}
|
|
|
166 |
int exitRc = EXIT_SUCCESS;
|
|
|
167 |
if(kicker && !data.dataOk()) {
|
|
|
168 |
SOFTERR("input and output differ");
|
|
|
169 |
exitRc = EXIT_FAILURE;
|
|
|
170 |
}
|
|
|
171 |
LOG(2, "ended");
|
|
|
172 |
exit(exitRc);
|
|
|
173 |
}
|
|
|
174 |
void NodeC::mainLoop() {
|
|
|
175 |
fd_set rs;
|
|
|
176 |
int nfds;
|
|
|
177 |
struct timeval t = {1, 0};
|
|
|
178 |
nfds = 0; FD_ZERO(&rs);
|
|
|
179 |
if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
|
|
|
180 |
while(nfds) {
|
|
|
181 |
t.tv_sec = csP->selTO;
|
|
|
182 |
int rc;
|
|
|
183 |
rc = select(nfds, &rs, NULL, NULL, &t);
|
|
|
184 |
if(rc < 0 && errno != EINTR) SYSERR("select");
|
|
|
185 |
if(rc > 0) {
|
|
|
186 |
if(FD_ISSET(ssc, &rs)) {
|
|
|
187 |
int i;
|
|
|
188 |
for(i=0; srvSides[i].sc > -1 && i < nodes; i++); // find unused slot for accept
|
|
|
189 |
if(i == nodes) HARDERR("can't accept, all slots in use");
|
|
|
190 |
LOG(5, "slot for accept=%d", i);
|
|
|
191 |
acc(i);
|
|
|
192 |
forward(i);
|
|
|
193 |
}
|
|
|
194 |
else
|
|
|
195 |
for(int i = 0; i < nodes; i++) {
|
|
|
196 |
if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); }
|
|
|
197 |
}
|
|
|
198 |
FD_ZERO(&rs); nfds = 0; t.tv_sec = 1;
|
|
|
199 |
for(int i = 0; i < nodes; i++) {
|
|
|
200 |
int sc = srvSides[i].sc;
|
|
|
201 |
if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; }
|
|
|
202 |
}
|
|
|
203 |
if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
|
|
|
204 |
}
|
|
|
205 |
}
|
|
|
206 |
void NodeC::forward(int sci) {
|
|
|
207 |
int next, scn;
|
|
|
208 |
if(getN(sci)) {
|
|
|
209 |
LOG(5, "received data from %u", data.remPort());
|
|
|
210 |
if(kicker) {
|
|
|
211 |
LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl());
|
|
|
212 |
if(data.dttl() <= 0) {
|
|
|
213 |
LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str());
|
|
|
214 |
closeClients();
|
|
|
215 |
*(forwP) = 0;
|
|
|
216 |
LOG(4, "leaving forward closing");
|
|
|
217 |
return;
|
|
|
218 |
}
|
|
|
219 |
}
|
|
|
220 |
next = next_node(); scn = next - first;
|
|
|
221 |
LOG(5, "forwarding len=%d to %d --->", data.dataLen, next);
|
|
|
222 |
if(cliSides[scn].sc < 0) conn(scn, next);
|
|
|
223 |
if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
|
|
|
224 |
putN(scn);
|
|
|
225 |
LOG(5, "forwarded to %u", next);
|
|
|
226 |
}
|
|
|
227 |
else {
|
|
|
228 |
closeClients();
|
|
|
229 |
closeSocket(sci, server);
|
|
|
230 |
}
|
|
|
231 |
return;
|
|
|
232 |
}
|
|
|
233 |
void NodeC::bindN() {
|
|
|
234 |
LOG(4, "binding...");
|
|
|
235 |
struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
|
|
|
236 |
memset(sa, 0, sizeof(struct addrinfo));
|
|
|
237 |
sa->ai_family = AF_INET;
|
|
|
238 |
sa->ai_socktype = SOCK_STREAM;
|
|
|
239 |
sa->ai_protocol = 0;
|
|
|
240 |
sa->ai_flags = AI_PASSIVE;
|
|
|
241 |
char s[64];
|
|
|
242 |
sprintf(s, "%d", locPort);
|
|
|
243 |
int e;
|
|
|
244 |
if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
|
|
|
245 |
GAI(4, sa);
|
|
|
246 |
if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
|
|
|
247 |
int opt = 1;
|
|
|
248 |
if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
|
|
|
249 |
if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
|
|
|
250 |
if(listen(ssc, 1) < 0) SYSERR("listen");
|
|
|
251 |
LOG(2, "bound to %d", locPort);
|
|
|
252 |
}
|
|
|
253 |
void NodeC::conn(int i, int remPort) {
|
|
|
254 |
DebugP deP = new DebugC();
|
|
|
255 |
DEBID("%s to %u", this->deP->debid, remPort);
|
|
|
256 |
cliSides[i].remPort = remPort;
|
|
|
257 |
int e;
|
|
|
258 |
LOG(4, "connecting to %u...", remPort);
|
|
|
259 |
int retry = csP->connThreshold;
|
|
|
260 |
struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
|
|
|
261 |
memset(ai, 0, sizeof(struct addrinfo));
|
|
|
262 |
ai->ai_family = AF_INET;
|
|
|
263 |
ai->ai_socktype = SOCK_STREAM;
|
|
|
264 |
ai->ai_protocol = 0;
|
|
|
265 |
ai->ai_flags = 0;
|
|
|
266 |
char port[6];
|
|
|
267 |
sprintf(port, "%d", remPort);
|
|
|
268 |
if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
|
|
|
269 |
GAI(3, ai);
|
|
|
270 |
if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
|
|
|
271 |
while(retry--) {
|
|
|
272 |
if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
|
|
|
273 |
if(errno != ECONNREFUSED) SYSERR("connect");
|
|
|
274 |
usleep(csP->connTO); }
|
|
|
275 |
else break; }
|
|
|
276 |
if(retry < 1) {
|
|
|
277 |
LOG(0, "connection refused threshold %d reached", csP->connThreshold);
|
|
|
278 |
exit(EXIT_FAILURE); }
|
|
|
279 |
if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
|
|
|
280 |
csP->shP->conns++;
|
|
|
281 |
if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
|
|
|
282 |
socklen_t l = sizeof(struct sockaddr);
|
|
|
283 |
struct sockaddr *sa = (sockaddr*)malloc(l);
|
|
|
284 |
if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
|
|
|
285 |
LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa);
|
|
|
286 |
if(ssl) {
|
|
|
287 |
ERR_clear_error();
|
|
|
288 |
if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
|
|
|
289 |
if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd");
|
|
|
290 |
if((e = SSL_connect(cliSides[i].sslP)) < 1) {
|
|
|
291 |
switch(SSL_get_error(cliSides[i].sslP, e)) {
|
|
|
292 |
case SSL_ERROR_SYSCALL:
|
|
|
293 |
ssl_err((char*)"SSL connect", deP);
|
|
|
294 |
if(e == 0) LOG(0, "SSL connect: EOF on socket");
|
|
|
295 |
else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno);
|
|
|
296 |
abend(deP);
|
|
|
297 |
break;
|
|
|
298 |
default:
|
|
|
299 |
SSLERR("SSL connect");
|
|
|
300 |
break;
|
|
|
301 |
}
|
|
|
302 |
}
|
|
|
303 |
}
|
|
|
304 |
LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1));
|
|
|
305 |
}
|
|
|
306 |
void NodeC::acc(int i) {
|
|
|
307 |
LOG(4, "accepting...");
|
|
|
308 |
if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept");
|
|
|
309 |
socklen_t l = sizeof(struct sockaddr);
|
|
|
310 |
struct sockaddr *sa = (sockaddr*)malloc(l);
|
|
|
311 |
if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
|
|
|
312 |
LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa);
|
|
|
313 |
if(ssl) {
|
|
|
314 |
int e;
|
|
|
315 |
ERR_clear_error();
|
|
|
316 |
if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
|
|
|
317 |
if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd");
|
|
|
318 |
if((e = SSL_accept(srvSides[i].sslP)) < 1) {
|
|
|
319 |
switch(SSL_get_error(srvSides[i].sslP, e)) {
|
|
|
320 |
case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
|
|
|
321 |
default: SSLERR("SSL accept"); break;
|
|
|
322 |
}
|
|
|
323 |
}
|
|
|
324 |
}
|
|
|
325 |
LOG(2, "accepted");
|
|
|
326 |
}
|
|
|
327 |
void NodeC::closeSocket(int i, nodeside side) {
|
|
|
328 |
SocketP sc;
|
|
|
329 |
if(side) sc = srvSides; else sc = cliSides;
|
|
|
330 |
LOG(5, "closing sc=%d...", sc[i].sc);
|
|
|
331 |
if(ssl) {
|
|
|
332 |
int e;
|
|
|
333 |
if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
|
|
|
334 |
if(!e) {
|
|
|
335 |
LOG(5, "SSL shutdown rc=0");
|
|
|
336 |
if((e = SSL_shutdown(sc[i].sslP)) < 0) {
|
|
|
337 |
switch(SSL_get_error(sc[i].sslP, e)) {
|
|
|
338 |
case SSL_ERROR_SYSCALL: {
|
|
|
339 |
long e;
|
|
|
340 |
if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)");
|
|
|
341 |
break;
|
|
|
342 |
}
|
|
|
343 |
default:
|
|
|
344 |
SSLERR("SSL shutdown (2)");
|
|
|
345 |
break;
|
|
|
346 |
}
|
|
|
347 |
}
|
|
|
348 |
}
|
|
|
349 |
}
|
|
|
350 |
close(sc[i].sc);
|
|
|
351 |
LOG(4, "closed sc=%d", sc[i].sc);
|
|
|
352 |
sc[i].sc = -1;
|
|
|
353 |
}
|
|
|
354 |
void closeCliTh(void *p) {
|
|
|
355 |
NodeP nP = (NodeP)p;
|
|
|
356 |
char *callerid = nP->deP->debid;
|
|
|
357 |
DebugP deP = new DebugC();
|
|
|
358 |
DEBID("%s CLOSE clients thread", callerid);
|
|
|
359 |
LOG(5, "start...");
|
|
|
360 |
for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client);
|
|
|
361 |
LOG(4, "all clients closed");
|
|
|
362 |
}
|
|
|
363 |
void NodeC::closeClients() {
|
|
|
364 |
if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) {
|
|
|
365 |
cout << "closing tread: " << e.what() << '\n'; }
|
|
|
366 |
closing = 1;
|
|
|
367 |
}
|
|
|
368 |
int NodeC::readN(int i) {
|
|
|
369 |
LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc);
|
|
|
370 |
int n, rest = data.dataLen;
|
|
|
371 |
char *buf = (char*)data.contP;
|
|
|
372 |
while(rest > 0) {
|
|
|
373 |
if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
|
|
|
374 |
else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
|
|
|
375 |
if(n == 0) {
|
|
|
376 |
LOG(4, "read EOF");
|
|
|
377 |
return 0;
|
|
|
378 |
}
|
|
|
379 |
else {
|
|
|
380 |
buf += n; rest -= n;
|
|
|
381 |
if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf);
|
|
|
382 |
}
|
|
|
383 |
}
|
|
|
384 |
if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
|
|
|
385 |
csP->shP->msgs++;
|
|
|
386 |
if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
|
|
|
387 |
LOG(5, "read %d from %u", data.dataLen, data.remPort());
|
|
|
388 |
return (data.dataLen);
|
|
|
389 |
}
|
|
|
390 |
int NodeC::writeN(int i) {
|
|
|
391 |
LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc);
|
|
|
392 |
int n, rest = data.dataLen;
|
|
|
393 |
char *buf = (char*)data.contP;
|
|
|
394 |
while(rest > 0) {
|
|
|
395 |
if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); }
|
|
|
396 |
else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); }
|
|
|
397 |
buf += n; rest -= n;
|
|
|
398 |
if(rest > 0) LOG(5, "partly written %d bytes", n);
|
|
|
399 |
}
|
|
|
400 |
LOG(5, "written %d", data.dataLen);
|
|
|
401 |
return data.dataLen;
|
|
|
402 |
}
|
|
|
403 |
int NodeC::getN(int i) { return readN(i) > 0; }
|
|
|
404 |
int NodeC::putN(int i) {
|
|
|
405 |
data.remPort(locPort);
|
|
|
406 |
return writeN(i) > 0; }
|
|
|
407 |
int NodeC::next_node() {
|
|
|
408 |
int next;
|
|
|
409 |
if(topo == ring) {
|
|
|
410 |
next = locPort + 1;
|
|
|
411 |
if(next > last) next = first;
|
|
|
412 |
}
|
|
|
413 |
else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort);
|
|
|
414 |
return next;
|
|
|
415 |
}
|
|
|
416 |
ConstellationC::ConstellationC() {
|
|
|
417 |
deP = NULL;
|
|
|
418 |
forwP = NULL;
|
|
|
419 |
topo = ring;
|
|
|
420 |
ssl = first = nodes = 0;
|
|
|
421 |
};
|
|
|
422 |
ConstellationC::ConstellationC(topology topo, int ssl) {
|
|
|
423 |
deP = new DebugC();
|
|
|
424 |
DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
|
|
|
425 |
this->topo = topo;
|
|
|
426 |
if(topo == ring) { first=csP->rp0; nodes=csP->rn; }
|
|
|
427 |
if(topo == mash) { first=csP->mp0; nodes=csP->mn; }
|
|
|
428 |
this->ssl = ssl;
|
|
|
429 |
first += ssl*500;
|
|
|
430 |
forwP = NULL;
|
|
|
431 |
}
|
|
|
432 |
int ConstellationC::run() {
|
|
|
433 |
int stat = 0, pid = 0, exitRc = EXIT_SUCCESS;
|
|
|
434 |
if(nodes == 0) exit(0);
|
|
|
435 |
if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
|
|
|
436 |
pid_t *pids = new pid_t[nodes];
|
|
|
437 |
LOG(1, "%d nodes starting...", nodes);
|
|
|
438 |
if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
|
|
|
439 |
*forwP = 1;
|
|
|
440 |
for(int port = first; port < first + nodes; port++) {
|
|
|
441 |
if(!(pid = fork())) (new NodeC(this, port))->run();
|
|
|
442 |
else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); }
|
|
|
443 |
}
|
|
|
444 |
LOG(2, "all nodes established");
|
|
|
445 |
while((pid = wait(&stat)) > 0)
|
|
|
446 |
if(WIFEXITED(stat)) {
|
|
|
447 |
LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
|
|
|
448 |
if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
|
|
|
449 |
}
|
|
|
450 |
else exitRc = EXIT_FAILURE;
|
|
|
451 |
LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR");
|
|
|
452 |
exit(exitRc);
|
|
|
453 |
}
|
|
|
454 |
CSS::CSS(DebugP deP, char *prgnP) {
|
|
|
455 |
ShareA(shP);
|
|
|
456 |
shP->msgs = 0;
|
|
|
457 |
shP->conns = 0;
|
|
|
458 |
if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
|
|
|
459 |
text = "bla bla";
|
|
|
460 |
ttl = 3;
|
|
|
461 |
rp0 = 11000;
|
|
|
462 |
mp0 = 12000;
|
|
|
463 |
rn = 0;
|
|
|
464 |
mn = 0;
|
|
|
465 |
issl = 0;
|
|
|
466 |
const string sslPathSuffP = "../CS";
|
|
|
467 |
caPath = "/home/local/etc/ssl/certs/";
|
|
|
468 |
connThreshold = 77; // connection retries threshold
|
|
|
469 |
connTO = 0.01 * 1000*1000; // connection sleep time in usecs
|
|
|
470 |
selTO = 1; // selection timeout in secs
|
|
|
471 |
|
|
|
472 |
if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB");
|
|
|
473 |
if(getenv("T") != NULL) { text = getenv("T"); }
|
|
|
474 |
if(getenv("CEP") != NULL) cePath = getenv("CEP");
|
|
|
475 |
else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; }
|
|
|
476 |
if(getenv("CAP") != NULL) caPath = getenv("CAP");
|
|
|
477 |
if(getArg("TTL") > 0) ttl = getArg("TTL");
|
|
|
478 |
if(getArg("RP0") > 0) rp0 = getArg("RP0");
|
|
|
479 |
if(getArg("MP0") > 0) mp0 = getArg("MP0");
|
|
|
480 |
if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; }
|
|
|
481 |
if(getArg("SSL") >= 0) issl = getArg("SSL");
|
|
|
482 |
if(getArg("RN") >= 0) rn = getArg("RN");
|
|
|
483 |
if(getArg("MN") >= 0) mn = getArg("MN");
|
|
|
484 |
shP->act = rn + mn; // initialize active node processes counter
|
|
|
485 |
if(issl > 1) shP->act += shP->act;
|
|
|
486 |
pacing = 0;
|
|
|
487 |
if(getenv("P") != NULL) {
|
|
|
488 |
double d = atof(getenv("P"));
|
|
|
489 |
pace.tv_sec=(time_t)trunc(d);
|
|
|
490 |
pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000;
|
|
|
491 |
if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1;
|
|
|
492 |
}
|
|
|
493 |
if(getArg("RS") >= 0) srandom(getArg("RS"));
|
|
|
494 |
}
|
|
|
495 |
int main(int argc, char *argv[]) {
|
|
|
496 |
DebugC::debug_init(argv[0]);
|
|
|
497 |
DebugP deP = new DebugC();
|
|
|
498 |
DEBID("client/server demo");
|
|
|
499 |
csP = new CSS(deP, argv[0]);
|
|
|
500 |
LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
|
|
|
501 |
argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug);
|
|
|
502 |
if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str());
|
|
|
503 |
if(csP->issl < 2) {
|
|
|
504 |
if(!fork()) (new ConstellationC(ring, csP->issl))->run();
|
|
|
505 |
if(!fork()) (new ConstellationC(mash, csP->issl))->run();
|
|
|
506 |
} else for(int ssl = 0; ssl < csP->issl; ssl++) {
|
|
|
507 |
if(!fork()) (new ConstellationC(ring, ssl))->run();
|
|
|
508 |
if(!fork()) (new ConstellationC(mash, ssl))->run();
|
|
|
509 |
}
|
|
|
510 |
int stat, exitRc = EXIT_SUCCESS;
|
|
|
511 |
while(wait(&stat) > 0)
|
|
|
512 |
if(WIFEXITED(stat)) {
|
|
|
513 |
LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat));
|
|
|
514 |
if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
|
|
|
515 |
}
|
|
|
516 |
else exitRc = EXIT_FAILURE;
|
|
|
517 |
LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns);
|
|
|
518 |
exit(exitRc);
|
|
|
519 |
}
|