diff -r 000000000000 -r 5c129dd80d4f CScpp/CS.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/CS.cpp Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,519 @@ +#include "CS.h" + +CSP csP; + +void sighandle(int sig) { return; } +void abend(DebugP deP) { + int pid; + //LOG(0, "ABORT, netstat:"); + //if(!(pid = fork())) { + // execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL); + // exit(EXIT_SUCCESS); + //} + waitpid(pid, NULL, 0); + LOG(0, "ABORT, backtrace:"); + deP->back_trace(); + kill(0, SIGTERM); + exit(EXIT_FAILURE); +} +char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket + char *s = (char*) malloc(64); + unsigned short port = *(unsigned short*) ai_addr->sa_data; + char *a = ai_addr->sa_data + 2; + sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port)); + return s; +} +void gai(int level, struct addrinfo *ai, DebugP deP) { // logs assigned IP4 addresses from addrinfo chain + struct addrinfo *sa = ai; + if (deP->debug >= level) do { + char *s = gpa(sa->ai_addr); + strcpy(deP->s, s); free(s); + deP->deb(level); + } while ((sa = sa->ai_next)); + fflush(stderr); +} +void ssl_err(char *s, DebugP deP) { + long e = ERR_get_error(); + while(e) { + LOG(0, "%s: %s", s, ERR_error_string(e, NULL)); + e = ERR_get_error(); + } +} +static int getArg(const char *a) { + return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; } +HeaderS::HeaderS() { + this->ttl = 0; } +HeaderS::HeaderS(int ttl) { + this->ttl = ttl; } +int HeaderS::len() { + return sizeof(HeaderS); } +PayloadS::PayloadS() { + strcpy(&text, "EMPTY"); } +PayloadS::PayloadS(const char *text) { + ts = time(NULL); + strcpy(&(this->text), text); } +int PayloadS::check(PayloadP p) { + return (int)(strcmp(&text, &p->text) == 0); } +char *PayloadS::deliver() { + return &text; } +string PayloadS::digest() { + string payl_s = string(&text); + if(payl_s.size() < 24) return payl_s; + else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); } +void PayloadS::sabotage() { + text = '?'; } +int PayloadS::len() { + return sizeof(PayloadS) + strlen(&text); } +int ContainerS::len() { + return hdr.len() + payl.len(); } +DataC::DataC() {} +DataC::DataC(DebugP callerDeP) { + deP = new DebugC(); + DEBID("%s DATA", callerDeP->debid); + dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size(); + contP = static_cast(operator new(dataLen)); + new(&contP->hdr) HeaderS(); + new(&contP->payl) PayloadS(); + LOG(5, "Empty Data instance established"); +} +void DataC::load(int ttl, const char *text) { + new(&contP->hdr) HeaderS(ttl); + new(&contP->payl) PayloadS(text); + LOG(5, "Data instance loaded with payload"); + return; +} +string DataC::unld() { + return string(&(contP->payl.text)); } +string DataC::digest() { + return contP->payl.digest(); } +int DataC::dttl() { + return --contP->hdr.ttl; } +int DataC::ttl() { + return contP->hdr.ttl; } +bool DataC::dataOk() { + return csP->text == string(&(contP->payl.text)); } +int DataC::ts() { + return contP->hdr.ts; } +int DataC::remPort(int remPort) { + return (contP->hdr.remPort = remPort); } +int DataC::remPort() { + return contP->hdr.remPort; } +NodeC::NodeC(ConstellationP co, int port) { + deP = new DebugC(); + DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port); + LOG(4, "intializing ..."); + data = DataC(deP); + topo = co->topo; + locPort = port; + first = co->first; + nodes = co->nodes; + last = first + nodes - 1; + cliSides = new SocketS[nodes]; + srvSides = new SocketS[nodes]; + for(int k=0; kforwP; + closing = 0; + ssl = co->ssl; + ssc = 0; + if(ssl) { + char s[128]; + SSL_load_error_strings(); + SSL_library_init(); + LOG(4, "setting SSL contex..."); + if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX"); + SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); // SSL will claim partner certificate + sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort); + LOG(5, "SSL private key used: %s", s); + if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); + sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort); + LOG(5, "SSL certificate used: %s", s); + if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); + LOG(5, "SSL: CApath: %s", csP->caPath.c_str()); + if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path"); + } + LOG(5, "initalized"); +} +int NodeC::run() { + LOG(5, "binding, kicker=%d", kicker); + bindN(); + if(kicker) { + data.load(csP->ttl, csP->text.c_str()); + int sci, next; + next = next_node(); sci = next - first; + LOG(2, "ready to initial send %s, len=%d to node %d", data.digest().c_str(), data.dataLen, next); + conn(sci, next); + putN(sci); + } + mainLoop(); + LOG(5, "closing ssc"); + close(ssc); + if(closing) closingThread.join(); // wait for closing thread + struct sigaction sigact; + sigfillset(&sigact.sa_mask); + sigact.sa_handler = sighandle; + if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); + if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); + if(--csP->shP->act == 0) { + if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); + kill(0, SIGUSR2); + } + else { + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + pause(); + } + int exitRc = EXIT_SUCCESS; + if(kicker && !data.dataOk()) { + SOFTERR("input and output differ"); + exitRc = EXIT_FAILURE; + } + LOG(2, "ended"); + exit(exitRc); +} +void NodeC::mainLoop() { + fd_set rs; + int nfds; + struct timeval t = {1, 0}; + nfds = 0; FD_ZERO(&rs); + if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } + while(nfds) { + t.tv_sec = csP->selTO; + int rc; + rc = select(nfds, &rs, NULL, NULL, &t); + if(rc < 0 && errno != EINTR) SYSERR("select"); + if(rc > 0) { + if(FD_ISSET(ssc, &rs)) { + int i; + for(i=0; srvSides[i].sc > -1 && i < nodes; i++); // find unused slot for accept + if(i == nodes) HARDERR("can't accept, all slots in use"); + LOG(5, "slot for accept=%d", i); + acc(i); + forward(i); + } + else + for(int i = 0; i < nodes; i++) { + if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); } + } + FD_ZERO(&rs); nfds = 0; t.tv_sec = 1; + for(int i = 0; i < nodes; i++) { + int sc = srvSides[i].sc; + if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; } + } + if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } + } +} +void NodeC::forward(int sci) { + int next, scn; + if(getN(sci)) { + LOG(5, "received data from %u", data.remPort()); + if(kicker) { + LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl()); + if(data.dttl() <= 0) { + LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str()); + closeClients(); + *(forwP) = 0; + LOG(4, "leaving forward closing"); + return; + } + } + next = next_node(); scn = next - first; + LOG(5, "forwarding len=%d to %d --->", data.dataLen, next); + if(cliSides[scn].sc < 0) conn(scn, next); + if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } + putN(scn); + LOG(5, "forwarded to %u", next); + } + else { + closeClients(); + closeSocket(sci, server); + } + return; +} +void NodeC::bindN() { + LOG(4, "binding..."); + struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(sa, 0, sizeof(struct addrinfo)); + sa->ai_family = AF_INET; + sa->ai_socktype = SOCK_STREAM; + sa->ai_protocol = 0; + sa->ai_flags = AI_PASSIVE; + char s[64]; + sprintf(s, "%d", locPort); + int e; + if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); + GAI(4, sa); + if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); + int opt = 1; + if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); + if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); + if(listen(ssc, 1) < 0) SYSERR("listen"); + LOG(2, "bound to %d", locPort); +} +void NodeC::conn(int i, int remPort) { + DebugP deP = new DebugC(); + DEBID("%s to %u", this->deP->debid, remPort); + cliSides[i].remPort = remPort; + int e; + LOG(4, "connecting to %u...", remPort); + int retry = csP->connThreshold; + struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(ai, 0, sizeof(struct addrinfo)); + ai->ai_family = AF_INET; + ai->ai_socktype = SOCK_STREAM; + ai->ai_protocol = 0; + ai->ai_flags = 0; + char port[6]; + sprintf(port, "%d", remPort); + if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); + GAI(3, ai); + if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); + while(retry--) { + if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { + if(errno != ECONNREFUSED) SYSERR("connect"); + usleep(csP->connTO); } + else break; } + if(retry < 1) { + LOG(0, "connection refused threshold %d reached", csP->connThreshold); + exit(EXIT_FAILURE); } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->conns++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = (sockaddr*)malloc(l); + if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa); + if(ssl) { + ERR_clear_error(); + if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); + if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd"); + if((e = SSL_connect(cliSides[i].sslP)) < 1) { + switch(SSL_get_error(cliSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: + ssl_err((char*)"SSL connect", deP); + if(e == 0) LOG(0, "SSL connect: EOF on socket"); + else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno); + abend(deP); + break; + default: + SSLERR("SSL connect"); + break; + } + } + } + LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1)); +} +void NodeC::acc(int i) { + LOG(4, "accepting..."); + if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = (sockaddr*)malloc(l); + if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa); + if(ssl) { + int e; + ERR_clear_error(); + if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); + if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd"); + if((e = SSL_accept(srvSides[i].sslP)) < 1) { + switch(SSL_get_error(srvSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; + default: SSLERR("SSL accept"); break; + } + } + } + LOG(2, "accepted"); +} +void NodeC::closeSocket(int i, nodeside side) { + SocketP sc; + if(side) sc = srvSides; else sc = cliSides; + LOG(5, "closing sc=%d...", sc[i].sc); + if(ssl) { + int e; + if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); + if(!e) { + LOG(5, "SSL shutdown rc=0"); + if((e = SSL_shutdown(sc[i].sslP)) < 0) { + switch(SSL_get_error(sc[i].sslP, e)) { + case SSL_ERROR_SYSCALL: { + long e; + if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)"); + break; + } + default: + SSLERR("SSL shutdown (2)"); + break; + } + } + } + } + close(sc[i].sc); + LOG(4, "closed sc=%d", sc[i].sc); + sc[i].sc = -1; +} +void closeCliTh(void *p) { + NodeP nP = (NodeP)p; + char *callerid = nP->deP->debid; + DebugP deP = new DebugC(); + DEBID("%s CLOSE clients thread", callerid); + LOG(5, "start..."); + for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client); + LOG(4, "all clients closed"); +} +void NodeC::closeClients() { + if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) { + cout << "closing tread: " << e.what() << '\n'; } + closing = 1; +} +int NodeC::readN(int i) { + LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc); + int n, rest = data.dataLen; + char *buf = (char*)data.contP; + while(rest > 0) { + if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } + else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } + if(n == 0) { + LOG(4, "read EOF"); + return 0; + } + else { + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf); + } + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->msgs++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + LOG(5, "read %d from %u", data.dataLen, data.remPort()); + return (data.dataLen); +} +int NodeC::writeN(int i) { + LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc); + int n, rest = data.dataLen; + char *buf = (char*)data.contP; + while(rest > 0) { + if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); } + else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); } + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly written %d bytes", n); + } + LOG(5, "written %d", data.dataLen); + return data.dataLen; +} +int NodeC::getN(int i) { return readN(i) > 0; } +int NodeC::putN(int i) { + data.remPort(locPort); + return writeN(i) > 0; } +int NodeC::next_node() { + int next; + if(topo == ring) { + next = locPort + 1; + if(next > last) next = first; + } + else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort); + return next; +} +ConstellationC::ConstellationC() { + deP = NULL; + forwP = NULL; + topo = ring; + ssl = first = nodes = 0; +}; +ConstellationC::ConstellationC(topology topo, int ssl) { + deP = new DebugC(); + DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING"); + this->topo = topo; + if(topo == ring) { first=csP->rp0; nodes=csP->rn; } + if(topo == mash) { first=csP->mp0; nodes=csP->mn; } + this->ssl = ssl; + first += ssl*500; + forwP = NULL; +} +int ConstellationC::run() { + int stat = 0, pid = 0, exitRc = EXIT_SUCCESS; + if(nodes == 0) exit(0); + if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); } + pid_t *pids = new pid_t[nodes]; + LOG(1, "%d nodes starting...", nodes); + if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap"); + *forwP = 1; + for(int port = first; port < first + nodes; port++) { + if(!(pid = fork())) (new NodeC(this, port))->run(); + else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); } + } + LOG(2, "all nodes established"); + while((pid = wait(&stat)) > 0) + if(WIFEXITED(stat)) { + LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat)); + if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; + } + else exitRc = EXIT_FAILURE; + LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR"); + exit(exitRc); +} +CSS::CSS(DebugP deP, char *prgnP) { + ShareA(shP); + shP->msgs = 0; + shP->conns = 0; + if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init"); + text = "bla bla"; + ttl = 3; + rp0 = 11000; + mp0 = 12000; + rn = 0; + mn = 0; + issl = 0; + const string sslPathSuffP = "../CS"; + caPath = "/home/local/etc/ssl/certs/"; + connThreshold = 77; // connection retries threshold + connTO = 0.01 * 1000*1000; // connection sleep time in usecs + selTO = 1; // selection timeout in secs + + if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB"); + if(getenv("T") != NULL) { text = getenv("T"); } + if(getenv("CEP") != NULL) cePath = getenv("CEP"); + else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; } + if(getenv("CAP") != NULL) caPath = getenv("CAP"); + if(getArg("TTL") > 0) ttl = getArg("TTL"); + if(getArg("RP0") > 0) rp0 = getArg("RP0"); + if(getArg("MP0") > 0) mp0 = getArg("MP0"); + if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; } + if(getArg("SSL") >= 0) issl = getArg("SSL"); + if(getArg("RN") >= 0) rn = getArg("RN"); + if(getArg("MN") >= 0) mn = getArg("MN"); + shP->act = rn + mn; // initialize active node processes counter + if(issl > 1) shP->act += shP->act; + pacing = 0; + if(getenv("P") != NULL) { + double d = atof(getenv("P")); + pace.tv_sec=(time_t)trunc(d); + pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000; + if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1; + } + if(getArg("RS") >= 0) srandom(getArg("RS")); +} +int main(int argc, char *argv[]) { + DebugC::debug_init(argv[0]); + DebugP deP = new DebugC(); + DEBID("client/server demo"); + csP = new CSS(deP, argv[0]); + LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\ + argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug); + if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str()); + if(csP->issl < 2) { + if(!fork()) (new ConstellationC(ring, csP->issl))->run(); + if(!fork()) (new ConstellationC(mash, csP->issl))->run(); + } else for(int ssl = 0; ssl < csP->issl; ssl++) { + if(!fork()) (new ConstellationC(ring, ssl))->run(); + if(!fork()) (new ConstellationC(mash, ssl))->run(); + } + int stat, exitRc = EXIT_SUCCESS; + while(wait(&stat) > 0) + if(WIFEXITED(stat)) { + LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat)); + if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; + } + else exitRc = EXIT_FAILURE; + LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns); + exit(exitRc); +}