|
1 #include "CS.h" |
|
2 |
|
3 NodeP thisP; |
|
4 DebugP deP; |
|
5 DataP dataP; |
|
6 |
|
7 void sighandle(int sig) { return; } |
|
8 |
|
9 void bindN() { |
|
10 LOG(4, "binding..."); |
|
11 struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); |
|
12 memset(sa, 0, sizeof(struct addrinfo)); |
|
13 sa->ai_family = AF_INET; |
|
14 sa->ai_socktype = SOCK_STREAM; |
|
15 sa->ai_protocol = 0; |
|
16 sa->ai_flags = AI_PASSIVE; |
|
17 char s[64]; |
|
18 sprintf(s, "%d", thisP->locPort); |
|
19 int e; |
|
20 if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); |
|
21 GAI(4, sa); |
|
22 if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); |
|
23 int opt = 1; |
|
24 if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); |
|
25 if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); |
|
26 if(listen(thisP->ssc, 1) < 0) SYSERR("listen"); |
|
27 LOG(2, "bound to %d", thisP->locPort); |
|
28 } |
|
29 void conn(int i, int remPort) { |
|
30 DebugT debug, *deP = &debug; |
|
31 DEBID("%s to %u", thisP->debug.id, remPort); |
|
32 thisP->cliSides[i].remPort = remPort; |
|
33 int e; |
|
34 LOG(4, "connecting to %u...", remPort); |
|
35 int retry = csP->connThreshold; |
|
36 struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); |
|
37 memset(ai, 0, sizeof(struct addrinfo)); |
|
38 ai->ai_family = AF_INET; |
|
39 ai->ai_socktype = SOCK_STREAM; |
|
40 ai->ai_protocol = 0; |
|
41 ai->ai_flags = 0; |
|
42 char port[6]; |
|
43 sprintf(port, "%d", remPort); |
|
44 if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); |
|
45 GAI(3, ai); |
|
46 if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); |
|
47 while(retry--) { |
|
48 if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { |
|
49 if(errno != ECONNREFUSED) SYSERR("connect"); |
|
50 usleep(csP->connTO); |
|
51 } |
|
52 else break; |
|
53 } |
|
54 if(retry < 1) { |
|
55 LOG(0, "connection refused threshold %d reached", csP->connThreshold); |
|
56 exit(EXIT_FAILURE); |
|
57 } |
|
58 if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); |
|
59 csP->shP->conns++; |
|
60 if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); |
|
61 socklen_t l = sizeof(struct sockaddr); |
|
62 struct sockaddr *sa = malloc(l); |
|
63 if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); |
|
64 LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa); |
|
65 |
|
66 if(thisP->ssl) { |
|
67 ERR_clear_error(); |
|
68 if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); |
|
69 if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd"); |
|
70 if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) { |
|
71 switch(SSL_get_error(thisP->cliSides[i].sslP, e)) { |
|
72 case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break; |
|
73 default: SSLERR("SSL connect"); break; |
|
74 } |
|
75 } |
|
76 } |
|
77 LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1)); |
|
78 } |
|
79 void acc(int i) { |
|
80 LOG(4, "accepting..."); |
|
81 if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept"); |
|
82 socklen_t l = sizeof(struct sockaddr); |
|
83 struct sockaddr *sa = malloc(l); |
|
84 if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); |
|
85 LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa); |
|
86 if(thisP->ssl) { |
|
87 int e; |
|
88 ERR_clear_error(); |
|
89 if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); |
|
90 if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd"); |
|
91 if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) { |
|
92 switch(SSL_get_error(thisP->srvSides[i].sslP, e)) { |
|
93 case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; |
|
94 default: SSLERR("SSL accept"); break; |
|
95 } |
|
96 } |
|
97 } |
|
98 LOG(2, "accepted"); |
|
99 } |
|
100 void closeN(int i, nodeside side) { |
|
101 SocketP sc; |
|
102 if(side) sc = thisP->srvSides; else sc = thisP->cliSides; |
|
103 LOG(5, "closing sc=%d...", sc[i].sc); |
|
104 if(thisP->ssl) { |
|
105 int e; |
|
106 if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); |
|
107 if(!e) { |
|
108 LOG(5, "SSL shutdown rc=0"); |
|
109 if((e = SSL_shutdown(sc[i].sslP)) < 0) { |
|
110 switch(SSL_get_error(sc[i].sslP, e)) { |
|
111 case SSL_ERROR_SYSCALL: |
|
112 if(!(e = ERR_get_error())) { |
|
113 if(errno) SYSERR("SSL shutdown (2)"); |
|
114 break; |
|
115 } |
|
116 break; |
|
117 default: SSLERR("SSL shutdown (2)"); break; |
|
118 } |
|
119 } |
|
120 } |
|
121 } |
|
122 close(sc[i].sc); |
|
123 LOG(4, "closed sc=%d", sc[i].sc); |
|
124 sc[i].sc = -1; |
|
125 } |
|
126 void *close_clients() { |
|
127 DebugT debug, *deP = &debug; |
|
128 DEBID("%s CLOSE clients", thisP->debug.id); |
|
129 LOG(5, "start..."); |
|
130 for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client); |
|
131 LOG(4, "all clients closed"); |
|
132 pthread_exit(NULL); |
|
133 } |
|
134 void close_node() { |
|
135 if(!thisP->closing) { |
|
136 thisP->closingThread = 0; |
|
137 if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread"); |
|
138 thisP->closing = 1; |
|
139 } |
|
140 } |
|
141 int readN(int i) { |
|
142 LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc); |
|
143 int n, rest = dataP->dataLen; |
|
144 void *buf = dataP->contP; |
|
145 while(rest > 0) { |
|
146 if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } |
|
147 else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } |
|
148 if(n == 0) { |
|
149 LOG(4, "read EOF"); |
|
150 return 0; |
|
151 } |
|
152 else { |
|
153 buf += n; rest -= n; |
|
154 if(rest > 0) LOG(5, "partly read %d bytes", n); |
|
155 } |
|
156 } |
|
157 if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); |
|
158 csP->shP->msgs++; |
|
159 if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); |
|
160 LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort); |
|
161 return dataP->dataLen; |
|
162 } |
|
163 int writeN(int i) { |
|
164 DebugT debug, *deP = &debug; |
|
165 DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort); |
|
166 LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc); |
|
167 int n, rest = dataP->dataLen; |
|
168 void *buf = dataP->contP; |
|
169 while(rest > 0) { |
|
170 if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); } |
|
171 else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); } |
|
172 buf += n; rest -= n; |
|
173 if(rest > 0) LOG(5, "partly written %d bytes", n); |
|
174 } |
|
175 LOG(5, "written %d", dataP->dataLen); |
|
176 return dataP->dataLen; |
|
177 } |
|
178 int getN(int i) { |
|
179 return readN(i) > 0; |
|
180 } |
|
181 int putN(int i) { |
|
182 dataP->contP->hdr.listPort = thisP->locPort; |
|
183 return writeN(i) > 0; |
|
184 } |
|
185 int next_node() { |
|
186 int next; |
|
187 if(thisP->topo == ring) { |
|
188 next = thisP->locPort + 1; |
|
189 if(next > thisP->last) next = thisP->first; |
|
190 } |
|
191 else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort); |
|
192 return next; |
|
193 } |
|
194 void forward(int sci) { |
|
195 int next, scn; |
|
196 char digest[24]; |
|
197 if(getN(sci)) { |
|
198 LOG(5, "received data from %u", remPortData(dataP)); |
|
199 if(thisP->kicker) { |
|
200 LOG(4, "received from node %u: %s, ttl=%d", |
|
201 remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP)); |
|
202 // if(ttlData(dataP) == 2) sabotageData(dataP); |
|
203 // if(ttlData(dataP) == 2) errno=0, SYSERR("signal test"); |
|
204 if(dttlData(dataP) <= 0) { |
|
205 LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest)); |
|
206 close_node(); |
|
207 *(thisP->forw) = 0; |
|
208 LOG(4, "leaving forward closing"); |
|
209 return; |
|
210 } |
|
211 } |
|
212 next = next_node(); scn = next - thisP->first; |
|
213 LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP)); |
|
214 if(thisP->cliSides[scn].sc < 0) conn(scn, next); |
|
215 if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } |
|
216 putN(scn); |
|
217 LOG(5, "forwarded to %u", next); |
|
218 } |
|
219 else { |
|
220 close_node(); |
|
221 closeN(sci, server); |
|
222 } |
|
223 return; |
|
224 } |
|
225 void main_loop() { |
|
226 sigset_t pacing; |
|
227 sigemptyset(&pacing); |
|
228 sigaddset(&pacing, PACING); |
|
229 union { // simple select mask debug |
|
230 fd_set rs; |
|
231 uint mask; |
|
232 } u; |
|
233 int nfds; |
|
234 FD_ZERO(&(u.rs)); nfds = 0; |
|
235 if(*(thisP->forw)) { |
|
236 FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } |
|
237 while(nfds) { |
|
238 struct timeval t = {csP->selTO, 0}; |
|
239 LOG(5, "selecting, mask=%08x", u.mask); |
|
240 int rc; |
|
241 rc = select(nfds, &(u.rs), NULL, NULL, &t); |
|
242 if(rc < 0 && errno != EINTR) SYSERR("select"); |
|
243 if(rc > 0) { |
|
244 LOG(5, "return from select, mask=%08x", u.mask); |
|
245 if(FD_ISSET(thisP->ssc, &(u.rs))) { // ssc posted: accept & forward |
|
246 int i; |
|
247 for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++); // find unused slot for accept |
|
248 if(i == thisP->nodes) HARDERR("can't accept, all slots in use"); |
|
249 LOG(5, "slot for accept=%d", i); |
|
250 acc(i); |
|
251 forward(i); |
|
252 } |
|
253 else // check which connected socket is posted |
|
254 for(int i = 0; i < thisP->nodes; i++) |
|
255 if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i); |
|
256 } |
|
257 FD_ZERO(&(u.rs)); nfds = 0; |
|
258 if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } |
|
259 for(int i = 0; i < thisP->nodes; i++) { // mask all connected client side sockets for select |
|
260 int sc = thisP->srvSides[i].sc; |
|
261 if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; } |
|
262 } |
|
263 } |
|
264 } |
|
265 void Node(topology topo, int *forw, int port, int first, int n, int ssl) { |
|
266 NodeT this; |
|
267 thisP = &this; |
|
268 deP = &(thisP->debug); |
|
269 DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port); |
|
270 LOG(4, "initializing..."); |
|
271 dataP = &thisP->data; |
|
272 Data(dataP, deP); |
|
273 thisP->topo = topo; |
|
274 thisP->locPort = port; |
|
275 thisP->first = first; |
|
276 thisP->last = first + n - 1; |
|
277 thisP->nodes = n; |
|
278 thisP->cliSides = malloc(n*sizeof(SocketT)); |
|
279 thisP->srvSides = malloc(n*sizeof(SocketT)); |
|
280 for(int k=0; k<n; k++) thisP->cliSides[k].sc = thisP->srvSides[k].sc = -1; |
|
281 thisP->kicker = (port == first); |
|
282 thisP->forw = forw; |
|
283 thisP->nodeIdx = port - first; |
|
284 thisP->closing = 0; |
|
285 thisP->ssl = ssl; |
|
286 if(thisP->ssl) { |
|
287 char s[64]; |
|
288 SSL_load_error_strings(); |
|
289 SSL_library_init(); |
|
290 LOG(4, "setting SSL contex..."); |
|
291 if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX"); |
|
292 SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY); |
|
293 SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); |
|
294 sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort); |
|
295 if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); |
|
296 sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort); |
|
297 LOG(5, "SSL private key used: %s", s); |
|
298 if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); |
|
299 LOG(5, "SSL certificate used: %s", s); |
|
300 if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1) SSLERR("hh's thrusted certs path"); |
|
301 } |
|
302 LOG(5, "initalized"); |
|
303 |
|
304 bindN(thisP); |
|
305 if(thisP->kicker) { |
|
306 loadData(dataP, csP->ttl, csP->text); |
|
307 int sci, next; |
|
308 next = next_node(); sci = next - thisP->first; |
|
309 char digest[24]; |
|
310 LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next); |
|
311 conn(sci, next); |
|
312 putN(sci); |
|
313 } |
|
314 |
|
315 main_loop(); |
|
316 |
|
317 LOG(5, "closing ssc"); |
|
318 close(thisP->ssc); |
|
319 if(thisP->closing) { // wait for closing thread |
|
320 if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); } |
|
321 struct sigaction sigact; |
|
322 sigfillset(&sigact.sa_mask); |
|
323 sigact.sa_handler=sighandle; |
|
324 if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); |
|
325 if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); |
|
326 int active = --csP->shP->act; |
|
327 if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); |
|
328 if(active > 0) pause(); |
|
329 else kill(0, SIGUSR2); |
|
330 int exitRc = EXIT_SUCCESS; |
|
331 if(thisP->kicker && !chkData(dataP)) { |
|
332 SOFTERR("INPUT AND OUTPUT DIFFER"); |
|
333 exitRc = EXIT_FAILURE; } |
|
334 LOG(2, "ended"); |
|
335 exit(exitRc); |
|
336 } |