CSc/Node.c
changeset 0 5c129dd80d4f
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/Node.c	Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,336 @@
+#include "CS.h"
+
+NodeP thisP;
+DebugP deP;
+DataP dataP;
+
+void sighandle(int sig) { return; }
+
+void bindN() {
+	LOG(4, "binding...");
+	struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+	memset(sa, 0, sizeof(struct addrinfo));
+	sa->ai_family = AF_INET;
+	sa->ai_socktype = SOCK_STREAM;
+	sa->ai_protocol = 0;
+	sa->ai_flags = AI_PASSIVE;
+	char s[64];
+	sprintf(s, "%d", thisP->locPort);
+	int e;
+	if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
+	GAI(4, sa);
+	if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
+   	int opt = 1;
+   	if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
+	if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
+	if(listen(thisP->ssc, 1) < 0) SYSERR("listen");
+	LOG(2, "bound to %d", thisP->locPort);
+}
+void conn(int i, int remPort) {
+	DebugT debug, *deP = &debug;
+	DEBID("%s to %u", thisP->debug.id, remPort);
+	thisP->cliSides[i].remPort = remPort;
+	int e;
+	LOG(4, "connecting to %u...", remPort);
+	int retry = csP->connThreshold;
+	struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+	memset(ai, 0, sizeof(struct addrinfo));
+	ai->ai_family = AF_INET;
+	ai->ai_socktype = SOCK_STREAM;
+	ai->ai_protocol = 0;
+	ai->ai_flags = 0;
+	char port[6];
+	sprintf(port, "%d", remPort);
+	if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
+	GAI(3, ai);
+	if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
+	while(retry--) {
+		if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
+			if(errno != ECONNREFUSED) SYSERR("connect");
+			usleep(csP->connTO);
+		}
+		else break;
+	}
+	if(retry < 1) {
+		LOG(0, "connection refused threshold %d reached", csP->connThreshold);
+		exit(EXIT_FAILURE);
+	}
+	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+	csP->shP->conns++;
+	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+	socklen_t l = sizeof(struct sockaddr);
+	struct sockaddr *sa = malloc(l);
+	if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+	LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa);
+
+	if(thisP->ssl) {
+		ERR_clear_error();
+		if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
+		if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd");
+		if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) {
+			switch(SSL_get_error(thisP->cliSides[i].sslP, e)) {
+				case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break;
+				default:	SSLERR("SSL connect"); break;
+			}
+		}
+	}
+	LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1));
+}
+void acc(int i) {
+	LOG(4, "accepting...");
+	if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept");
+	socklen_t l = sizeof(struct sockaddr);
+	struct sockaddr *sa = malloc(l);
+	if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+	LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa);
+	if(thisP->ssl) {
+		int e;
+		ERR_clear_error();
+		if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
+		if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd");
+		if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) {
+			switch(SSL_get_error(thisP->srvSides[i].sslP, e)) {
+				case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
+				default:	SSLERR("SSL accept"); break;
+			}
+		}
+	}
+	LOG(2, "accepted");
+}
+void closeN(int i, nodeside side) {
+	SocketP sc;
+	if(side) sc = thisP->srvSides; else sc = thisP->cliSides;
+	LOG(5, "closing sc=%d...", sc[i].sc);
+	if(thisP->ssl) {
+		int e;
+		if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
+		if(!e) {
+			LOG(5, "SSL shutdown rc=0");
+			if((e = SSL_shutdown(sc[i].sslP)) < 0) {
+				switch(SSL_get_error(sc[i].sslP, e)) {
+					case SSL_ERROR_SYSCALL:
+						if(!(e = ERR_get_error())) {
+							if(errno) SYSERR("SSL shutdown (2)");
+							break;
+						}
+						break;
+					default:	SSLERR("SSL shutdown (2)"); break;
+				}
+			}
+		}
+	}
+	close(sc[i].sc);
+	LOG(4, "closed sc=%d", sc[i].sc);
+	sc[i].sc = -1;
+}
+void *close_clients() {
+	DebugT debug, *deP = &debug;
+	DEBID("%s CLOSE clients", thisP->debug.id);
+	LOG(5, "start...");
+	for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client);
+	LOG(4, "all clients closed");
+	pthread_exit(NULL);
+}
+void close_node() {
+	if(!thisP->closing) {
+		thisP->closingThread = 0;
+		if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread");
+	thisP->closing = 1;
+	}
+}
+int readN(int i) {
+	LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc);
+	int n, rest = dataP->dataLen;
+	void *buf = dataP->contP;
+	while(rest > 0) {
+		if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
+		else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
+		if(n == 0) {
+			LOG(4, "read EOF");
+			return 0;
+		}
+		else {
+			buf += n; rest -= n;
+			if(rest > 0) LOG(5, "partly read %d bytes", n);
+		}
+	}
+	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+	csP->shP->msgs++;
+	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+	LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort);
+	return dataP->dataLen;
+}
+int writeN(int i) {
+	DebugT debug, *deP = &debug;
+	DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort);
+	LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc);
+	int n, rest = dataP->dataLen;
+	void *buf = dataP->contP;
+	while(rest > 0) {
+		if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); }
+		else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); }
+		buf += n; rest -= n;
+		if(rest > 0) LOG(5, "partly written %d bytes", n);
+	}
+	LOG(5, "written %d", dataP->dataLen);
+	return dataP->dataLen;
+}
+int getN(int i) {
+	return readN(i) > 0;
+}
+int putN(int i) {
+	dataP->contP->hdr.listPort = thisP->locPort;
+	return writeN(i) > 0;
+}
+int next_node() {
+	int next;
+	if(thisP->topo == ring) {
+		next = thisP->locPort + 1;
+		if(next > thisP->last) next = thisP->first;
+	}
+	else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort);
+	return next;
+}
+void forward(int sci) {
+	int next, scn;
+	char digest[24];
+	if(getN(sci)) {
+   	LOG(5, "received data from %u", remPortData(dataP));
+		if(thisP->kicker) {
+			LOG(4, "received from node %u: %s, ttl=%d",
+						remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP));
+//			if(ttlData(dataP) == 2) sabotageData(dataP);
+//			if(ttlData(dataP) == 2) errno=0, SYSERR("signal test");
+			if(dttlData(dataP) <= 0) {
+      		LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest));
+				close_node();
+				*(thisP->forw) = 0;
+				LOG(4, "leaving forward closing");
+				return;
+			}
+		}
+		next = next_node(); scn = next - thisP->first;
+		LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP));
+		if(thisP->cliSides[scn].sc < 0) conn(scn, next);
+		if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
+		putN(scn);
+      LOG(5, "forwarded to %u", next);
+	}
+	else {
+		close_node();
+		closeN(sci, server);
+	}
+	return;
+}
+void main_loop() {
+	sigset_t pacing;
+	sigemptyset(&pacing);
+   sigaddset(&pacing, PACING);
+	union {		// simple select mask debug
+	   fd_set rs;
+	   uint mask;
+	} u;
+	int nfds;
+	FD_ZERO(&(u.rs)); nfds = 0;
+	if(*(thisP->forw)) {
+		FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
+	while(nfds) {
+		struct timeval t = {csP->selTO, 0};
+		LOG(5, "selecting, mask=%08x", u.mask);
+		int rc;
+		rc = select(nfds, &(u.rs), NULL, NULL, &t);
+		if(rc < 0 && errno != EINTR) SYSERR("select");
+		if(rc > 0) {
+			LOG(5, "return from select, mask=%08x", u.mask);
+			if(FD_ISSET(thisP->ssc, &(u.rs))) {												// ssc posted: accept & forward
+				int i;
+				for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++);		// find unused slot for accept
+				if(i == thisP->nodes) HARDERR("can't accept, all slots in use");
+				LOG(5, "slot for accept=%d", i);
+				acc(i);
+				forward(i);
+			}
+			else																						// check which connected socket is posted
+				for(int i = 0; i < thisP->nodes; i++)
+					if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i);
+		}
+		FD_ZERO(&(u.rs)); nfds = 0;
+		if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
+		for(int i = 0; i < thisP->nodes; i++) {											// mask all connected client side sockets for select
+			int sc = thisP->srvSides[i].sc;
+			if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; }
+		}
+	}
+}
+void Node(topology topo, int *forw, int port, int first, int n, int ssl) {
+	NodeT this;
+	thisP = &this;
+	deP = &(thisP->debug);
+	DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port);
+	LOG(4, "initializing...");
+	dataP = &thisP->data;
+	Data(dataP, deP);
+	thisP->topo = topo;
+	thisP->locPort = port;
+	thisP->first = first;
+	thisP->last = first + n - 1;
+	thisP->nodes = n;
+	thisP->cliSides = malloc(n*sizeof(SocketT));
+	thisP->srvSides = malloc(n*sizeof(SocketT));
+	for(int k=0; k<n; k++) 	thisP->cliSides[k].sc = thisP->srvSides[k].sc = -1;
+	thisP->kicker = (port == first);
+	thisP->forw = forw;
+	thisP->nodeIdx = port - first;
+	thisP->closing = 0;
+	thisP->ssl = ssl;
+	if(thisP->ssl) {
+		char s[64];
+		SSL_load_error_strings();
+		SSL_library_init();
+		LOG(4, "setting SSL contex...");
+		if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX");
+		SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY);
+		SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
+		sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort);
+		if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
+		sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort);
+		LOG(5, "SSL private key used: %s", s);
+		if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
+		LOG(5, "SSL certificate used: %s", s);
+		if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1)	SSLERR("hh's thrusted certs path");
+	}
+	LOG(5, "initalized");
+
+	bindN(thisP);
+	if(thisP->kicker) {
+		loadData(dataP, csP->ttl, csP->text);
+		int sci, next;
+		next = next_node(); sci = next - thisP->first;
+		char digest[24];
+		LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next);
+		conn(sci, next);
+		putN(sci);
+	}
+
+	main_loop();
+
+	LOG(5, "closing ssc");
+	close(thisP->ssc);
+	if(thisP->closing) {		// wait for closing thread
+		if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); }
+	struct sigaction sigact;
+	sigfillset(&sigact.sa_mask);
+	sigact.sa_handler=sighandle;
+	if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
+	if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
+	int active = --csP->shP->act;
+	if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
+	if(active > 0)	pause();
+	else kill(0, SIGUSR2);
+	int exitRc = EXIT_SUCCESS;
+	if(thisP->kicker && !chkData(dataP)) {
+		SOFTERR("INPUT AND OUTPUT DIFFER");
+		exitRc = EXIT_FAILURE; }
+	LOG(2, "ended");
+	exit(exitRc);
+}