|
0
|
1 |
#include "CS.h"
|
|
|
2 |
|
|
|
3 |
NodeP thisP;
|
|
|
4 |
DebugP deP;
|
|
|
5 |
DataP dataP;
|
|
|
6 |
|
|
|
7 |
void sighandle(int sig) { return; }
|
|
|
8 |
|
|
|
9 |
void bindN() {
|
|
|
10 |
LOG(4, "binding...");
|
|
|
11 |
struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
|
|
|
12 |
memset(sa, 0, sizeof(struct addrinfo));
|
|
|
13 |
sa->ai_family = AF_INET;
|
|
|
14 |
sa->ai_socktype = SOCK_STREAM;
|
|
|
15 |
sa->ai_protocol = 0;
|
|
|
16 |
sa->ai_flags = AI_PASSIVE;
|
|
|
17 |
char s[64];
|
|
|
18 |
sprintf(s, "%d", thisP->locPort);
|
|
|
19 |
int e;
|
|
|
20 |
if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
|
|
|
21 |
GAI(4, sa);
|
|
|
22 |
if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
|
|
|
23 |
int opt = 1;
|
|
|
24 |
if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
|
|
|
25 |
if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
|
|
|
26 |
if(listen(thisP->ssc, 1) < 0) SYSERR("listen");
|
|
|
27 |
LOG(2, "bound to %d", thisP->locPort);
|
|
|
28 |
}
|
|
|
29 |
void conn(int i, int remPort) {
|
|
|
30 |
DebugT debug, *deP = &debug;
|
|
|
31 |
DEBID("%s to %u", thisP->debug.id, remPort);
|
|
|
32 |
thisP->cliSides[i].remPort = remPort;
|
|
|
33 |
int e;
|
|
|
34 |
LOG(4, "connecting to %u...", remPort);
|
|
|
35 |
int retry = csP->connThreshold;
|
|
|
36 |
struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
|
|
|
37 |
memset(ai, 0, sizeof(struct addrinfo));
|
|
|
38 |
ai->ai_family = AF_INET;
|
|
|
39 |
ai->ai_socktype = SOCK_STREAM;
|
|
|
40 |
ai->ai_protocol = 0;
|
|
|
41 |
ai->ai_flags = 0;
|
|
|
42 |
char port[6];
|
|
|
43 |
sprintf(port, "%d", remPort);
|
|
|
44 |
if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
|
|
|
45 |
GAI(3, ai);
|
|
|
46 |
if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
|
|
|
47 |
while(retry--) {
|
|
|
48 |
if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
|
|
|
49 |
if(errno != ECONNREFUSED) SYSERR("connect");
|
|
|
50 |
usleep(csP->connTO);
|
|
|
51 |
}
|
|
|
52 |
else break;
|
|
|
53 |
}
|
|
|
54 |
if(retry < 1) {
|
|
|
55 |
LOG(0, "connection refused threshold %d reached", csP->connThreshold);
|
|
|
56 |
exit(EXIT_FAILURE);
|
|
|
57 |
}
|
|
|
58 |
if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
|
|
|
59 |
csP->shP->conns++;
|
|
|
60 |
if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
|
|
|
61 |
socklen_t l = sizeof(struct sockaddr);
|
|
|
62 |
struct sockaddr *sa = malloc(l);
|
|
|
63 |
if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
|
|
|
64 |
LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa);
|
|
|
65 |
|
|
|
66 |
if(thisP->ssl) {
|
|
|
67 |
ERR_clear_error();
|
|
|
68 |
if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
|
|
|
69 |
if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd");
|
|
|
70 |
if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) {
|
|
|
71 |
switch(SSL_get_error(thisP->cliSides[i].sslP, e)) {
|
|
|
72 |
case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break;
|
|
|
73 |
default: SSLERR("SSL connect"); break;
|
|
|
74 |
}
|
|
|
75 |
}
|
|
|
76 |
}
|
|
|
77 |
LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1));
|
|
|
78 |
}
|
|
|
79 |
void acc(int i) {
|
|
|
80 |
LOG(4, "accepting...");
|
|
|
81 |
if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept");
|
|
|
82 |
socklen_t l = sizeof(struct sockaddr);
|
|
|
83 |
struct sockaddr *sa = malloc(l);
|
|
|
84 |
if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
|
|
|
85 |
LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa);
|
|
|
86 |
if(thisP->ssl) {
|
|
|
87 |
int e;
|
|
|
88 |
ERR_clear_error();
|
|
|
89 |
if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
|
|
|
90 |
if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd");
|
|
|
91 |
if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) {
|
|
|
92 |
switch(SSL_get_error(thisP->srvSides[i].sslP, e)) {
|
|
|
93 |
case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
|
|
|
94 |
default: SSLERR("SSL accept"); break;
|
|
|
95 |
}
|
|
|
96 |
}
|
|
|
97 |
}
|
|
|
98 |
LOG(2, "accepted");
|
|
|
99 |
}
|
|
|
100 |
void closeN(int i, nodeside side) {
|
|
|
101 |
SocketP sc;
|
|
|
102 |
if(side) sc = thisP->srvSides; else sc = thisP->cliSides;
|
|
|
103 |
LOG(5, "closing sc=%d...", sc[i].sc);
|
|
|
104 |
if(thisP->ssl) {
|
|
|
105 |
int e;
|
|
|
106 |
if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
|
|
|
107 |
if(!e) {
|
|
|
108 |
LOG(5, "SSL shutdown rc=0");
|
|
|
109 |
if((e = SSL_shutdown(sc[i].sslP)) < 0) {
|
|
|
110 |
switch(SSL_get_error(sc[i].sslP, e)) {
|
|
|
111 |
case SSL_ERROR_SYSCALL:
|
|
|
112 |
if(!(e = ERR_get_error())) {
|
|
|
113 |
if(errno) SYSERR("SSL shutdown (2)");
|
|
|
114 |
break;
|
|
|
115 |
}
|
|
|
116 |
break;
|
|
|
117 |
default: SSLERR("SSL shutdown (2)"); break;
|
|
|
118 |
}
|
|
|
119 |
}
|
|
|
120 |
}
|
|
|
121 |
}
|
|
|
122 |
close(sc[i].sc);
|
|
|
123 |
LOG(4, "closed sc=%d", sc[i].sc);
|
|
|
124 |
sc[i].sc = -1;
|
|
|
125 |
}
|
|
|
126 |
void *close_clients() {
|
|
|
127 |
DebugT debug, *deP = &debug;
|
|
|
128 |
DEBID("%s CLOSE clients", thisP->debug.id);
|
|
|
129 |
LOG(5, "start...");
|
|
|
130 |
for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client);
|
|
|
131 |
LOG(4, "all clients closed");
|
|
|
132 |
pthread_exit(NULL);
|
|
|
133 |
}
|
|
|
134 |
void close_node() {
|
|
|
135 |
if(!thisP->closing) {
|
|
|
136 |
thisP->closingThread = 0;
|
|
|
137 |
if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread");
|
|
|
138 |
thisP->closing = 1;
|
|
|
139 |
}
|
|
|
140 |
}
|
|
|
141 |
int readN(int i) {
|
|
|
142 |
LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc);
|
|
|
143 |
int n, rest = dataP->dataLen;
|
|
|
144 |
void *buf = dataP->contP;
|
|
|
145 |
while(rest > 0) {
|
|
|
146 |
if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
|
|
|
147 |
else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
|
|
|
148 |
if(n == 0) {
|
|
|
149 |
LOG(4, "read EOF");
|
|
|
150 |
return 0;
|
|
|
151 |
}
|
|
|
152 |
else {
|
|
|
153 |
buf += n; rest -= n;
|
|
|
154 |
if(rest > 0) LOG(5, "partly read %d bytes", n);
|
|
|
155 |
}
|
|
|
156 |
}
|
|
|
157 |
if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
|
|
|
158 |
csP->shP->msgs++;
|
|
|
159 |
if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
|
|
|
160 |
LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort);
|
|
|
161 |
return dataP->dataLen;
|
|
|
162 |
}
|
|
|
163 |
int writeN(int i) {
|
|
|
164 |
DebugT debug, *deP = &debug;
|
|
|
165 |
DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort);
|
|
|
166 |
LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc);
|
|
|
167 |
int n, rest = dataP->dataLen;
|
|
|
168 |
void *buf = dataP->contP;
|
|
|
169 |
while(rest > 0) {
|
|
|
170 |
if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); }
|
|
|
171 |
else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); }
|
|
|
172 |
buf += n; rest -= n;
|
|
|
173 |
if(rest > 0) LOG(5, "partly written %d bytes", n);
|
|
|
174 |
}
|
|
|
175 |
LOG(5, "written %d", dataP->dataLen);
|
|
|
176 |
return dataP->dataLen;
|
|
|
177 |
}
|
|
|
178 |
int getN(int i) {
|
|
|
179 |
return readN(i) > 0;
|
|
|
180 |
}
|
|
|
181 |
int putN(int i) {
|
|
|
182 |
dataP->contP->hdr.listPort = thisP->locPort;
|
|
|
183 |
return writeN(i) > 0;
|
|
|
184 |
}
|
|
|
185 |
int next_node() {
|
|
|
186 |
int next;
|
|
|
187 |
if(thisP->topo == ring) {
|
|
|
188 |
next = thisP->locPort + 1;
|
|
|
189 |
if(next > thisP->last) next = thisP->first;
|
|
|
190 |
}
|
|
|
191 |
else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort);
|
|
|
192 |
return next;
|
|
|
193 |
}
|
|
|
194 |
void forward(int sci) {
|
|
|
195 |
int next, scn;
|
|
|
196 |
char digest[24];
|
|
|
197 |
if(getN(sci)) {
|
|
|
198 |
LOG(5, "received data from %u", remPortData(dataP));
|
|
|
199 |
if(thisP->kicker) {
|
|
|
200 |
LOG(4, "received from node %u: %s, ttl=%d",
|
|
|
201 |
remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP));
|
|
|
202 |
// if(ttlData(dataP) == 2) sabotageData(dataP);
|
|
|
203 |
// if(ttlData(dataP) == 2) errno=0, SYSERR("signal test");
|
|
|
204 |
if(dttlData(dataP) <= 0) {
|
|
|
205 |
LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest));
|
|
|
206 |
close_node();
|
|
|
207 |
*(thisP->forw) = 0;
|
|
|
208 |
LOG(4, "leaving forward closing");
|
|
|
209 |
return;
|
|
|
210 |
}
|
|
|
211 |
}
|
|
|
212 |
next = next_node(); scn = next - thisP->first;
|
|
|
213 |
LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP));
|
|
|
214 |
if(thisP->cliSides[scn].sc < 0) conn(scn, next);
|
|
|
215 |
if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
|
|
|
216 |
putN(scn);
|
|
|
217 |
LOG(5, "forwarded to %u", next);
|
|
|
218 |
}
|
|
|
219 |
else {
|
|
|
220 |
close_node();
|
|
|
221 |
closeN(sci, server);
|
|
|
222 |
}
|
|
|
223 |
return;
|
|
|
224 |
}
|
|
|
225 |
void main_loop() {
|
|
|
226 |
sigset_t pacing;
|
|
|
227 |
sigemptyset(&pacing);
|
|
|
228 |
sigaddset(&pacing, PACING);
|
|
|
229 |
union { // simple select mask debug
|
|
|
230 |
fd_set rs;
|
|
|
231 |
uint mask;
|
|
|
232 |
} u;
|
|
|
233 |
int nfds;
|
|
|
234 |
FD_ZERO(&(u.rs)); nfds = 0;
|
|
|
235 |
if(*(thisP->forw)) {
|
|
|
236 |
FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
|
|
|
237 |
while(nfds) {
|
|
|
238 |
struct timeval t = {csP->selTO, 0};
|
|
|
239 |
LOG(5, "selecting, mask=%08x", u.mask);
|
|
|
240 |
int rc;
|
|
|
241 |
rc = select(nfds, &(u.rs), NULL, NULL, &t);
|
|
|
242 |
if(rc < 0 && errno != EINTR) SYSERR("select");
|
|
|
243 |
if(rc > 0) {
|
|
|
244 |
LOG(5, "return from select, mask=%08x", u.mask);
|
|
|
245 |
if(FD_ISSET(thisP->ssc, &(u.rs))) { // ssc posted: accept & forward
|
|
|
246 |
int i;
|
|
|
247 |
for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++); // find unused slot for accept
|
|
|
248 |
if(i == thisP->nodes) HARDERR("can't accept, all slots in use");
|
|
|
249 |
LOG(5, "slot for accept=%d", i);
|
|
|
250 |
acc(i);
|
|
|
251 |
forward(i);
|
|
|
252 |
}
|
|
|
253 |
else // check which connected socket is posted
|
|
|
254 |
for(int i = 0; i < thisP->nodes; i++)
|
|
|
255 |
if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i);
|
|
|
256 |
}
|
|
|
257 |
FD_ZERO(&(u.rs)); nfds = 0;
|
|
|
258 |
if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
|
|
|
259 |
for(int i = 0; i < thisP->nodes; i++) { // mask all connected client side sockets for select
|
|
|
260 |
int sc = thisP->srvSides[i].sc;
|
|
|
261 |
if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; }
|
|
|
262 |
}
|
|
|
263 |
}
|
|
|
264 |
}
|
|
|
265 |
void Node(topology topo, int *forw, int port, int first, int n, int ssl) {
|
|
|
266 |
NodeT this;
|
|
|
267 |
thisP = &this;
|
|
|
268 |
deP = &(thisP->debug);
|
|
|
269 |
DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port);
|
|
|
270 |
LOG(4, "initializing...");
|
|
|
271 |
dataP = &thisP->data;
|
|
|
272 |
Data(dataP, deP);
|
|
|
273 |
thisP->topo = topo;
|
|
|
274 |
thisP->locPort = port;
|
|
|
275 |
thisP->first = first;
|
|
|
276 |
thisP->last = first + n - 1;
|
|
|
277 |
thisP->nodes = n;
|
|
|
278 |
thisP->cliSides = malloc(n*sizeof(SocketT));
|
|
|
279 |
thisP->srvSides = malloc(n*sizeof(SocketT));
|
|
|
280 |
for(int k=0; k<n; k++) thisP->cliSides[k].sc = thisP->srvSides[k].sc = -1;
|
|
|
281 |
thisP->kicker = (port == first);
|
|
|
282 |
thisP->forw = forw;
|
|
|
283 |
thisP->nodeIdx = port - first;
|
|
|
284 |
thisP->closing = 0;
|
|
|
285 |
thisP->ssl = ssl;
|
|
|
286 |
if(thisP->ssl) {
|
|
|
287 |
char s[64];
|
|
|
288 |
SSL_load_error_strings();
|
|
|
289 |
SSL_library_init();
|
|
|
290 |
LOG(4, "setting SSL contex...");
|
|
|
291 |
if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX");
|
|
|
292 |
SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY);
|
|
|
293 |
SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
|
|
|
294 |
sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort);
|
|
|
295 |
if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
|
|
|
296 |
sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort);
|
|
|
297 |
LOG(5, "SSL private key used: %s", s);
|
|
|
298 |
if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
|
|
|
299 |
LOG(5, "SSL certificate used: %s", s);
|
|
|
300 |
if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1) SSLERR("hh's thrusted certs path");
|
|
|
301 |
}
|
|
|
302 |
LOG(5, "initalized");
|
|
|
303 |
|
|
|
304 |
bindN(thisP);
|
|
|
305 |
if(thisP->kicker) {
|
|
|
306 |
loadData(dataP, csP->ttl, csP->text);
|
|
|
307 |
int sci, next;
|
|
|
308 |
next = next_node(); sci = next - thisP->first;
|
|
|
309 |
char digest[24];
|
|
|
310 |
LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next);
|
|
|
311 |
conn(sci, next);
|
|
|
312 |
putN(sci);
|
|
|
313 |
}
|
|
|
314 |
|
|
|
315 |
main_loop();
|
|
|
316 |
|
|
|
317 |
LOG(5, "closing ssc");
|
|
|
318 |
close(thisP->ssc);
|
|
|
319 |
if(thisP->closing) { // wait for closing thread
|
|
|
320 |
if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); }
|
|
|
321 |
struct sigaction sigact;
|
|
|
322 |
sigfillset(&sigact.sa_mask);
|
|
|
323 |
sigact.sa_handler=sighandle;
|
|
|
324 |
if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
|
|
|
325 |
if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
|
|
|
326 |
int active = --csP->shP->act;
|
|
|
327 |
if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
|
|
|
328 |
if(active > 0) pause();
|
|
|
329 |
else kill(0, SIGUSR2);
|
|
|
330 |
int exitRc = EXIT_SUCCESS;
|
|
|
331 |
if(thisP->kicker && !chkData(dataP)) {
|
|
|
332 |
SOFTERR("INPUT AND OUTPUT DIFFER");
|
|
|
333 |
exitRc = EXIT_FAILURE; }
|
|
|
334 |
LOG(2, "ended");
|
|
|
335 |
exit(exitRc);
|
|
|
336 |
}
|