CScpp/CS.cpp
changeset 0 5c129dd80d4f
equal deleted inserted replaced
-1:000000000000 0:5c129dd80d4f
       
     1 #include "CS.h"
       
     2 
       
     3 CSP csP;
       
     4 
       
     5 void sighandle(int sig) { return; }
       
     6 void abend(DebugP deP) {
       
     7    int pid;
       
     8    //LOG(0, "ABORT, netstat:");
       
     9    //if(!(pid = fork())) {
       
    10    //   execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
       
    11    //   exit(EXIT_SUCCESS);
       
    12    //}
       
    13    waitpid(pid, NULL, 0);
       
    14    LOG(0, "ABORT, backtrace:");
       
    15    deP->back_trace();
       
    16    kill(0, SIGTERM);
       
    17    exit(EXIT_FAILURE);
       
    18 }
       
    19 char *gpa(struct sockaddr *ai_addr) {							// returns string with IP4 address & port assigned to the socket
       
    20    char *s = (char*) malloc(64);
       
    21    unsigned short port = *(unsigned short*) ai_addr->sa_data;
       
    22    char *a = ai_addr->sa_data + 2;
       
    23    sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
       
    24    return s;
       
    25 }
       
    26 void gai(int level, struct addrinfo *ai, DebugP deP) {			// logs assigned IP4 addresses from addrinfo chain
       
    27    struct addrinfo *sa = ai;
       
    28    if (deP->debug >= level) do {
       
    29       char *s = gpa(sa->ai_addr);
       
    30       strcpy(deP->s, s); free(s);
       
    31       deP->deb(level);
       
    32    } while ((sa = sa->ai_next));
       
    33    fflush(stderr);
       
    34 }
       
    35 void ssl_err(char *s, DebugP deP) {
       
    36    long e = ERR_get_error();
       
    37    while(e) {
       
    38       LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
       
    39       e = ERR_get_error();
       
    40    }
       
    41 }
       
    42 static int getArg(const char *a) {
       
    43 	return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; }
       
    44 HeaderS::HeaderS() {
       
    45 	this->ttl = 0; }
       
    46 HeaderS::HeaderS(int ttl) {
       
    47 	this->ttl = ttl; }
       
    48 int HeaderS::len() {
       
    49 	return sizeof(HeaderS); }
       
    50 PayloadS::PayloadS() {
       
    51 	strcpy(&text, "EMPTY"); }
       
    52 PayloadS::PayloadS(const char *text) {
       
    53 	ts = time(NULL);
       
    54 	strcpy(&(this->text), text); }
       
    55 int PayloadS::check(PayloadP p) {
       
    56    return (int)(strcmp(&text, &p->text) == 0); }
       
    57 char *PayloadS::deliver() {
       
    58    return &text; }
       
    59 string PayloadS::digest() {
       
    60 	string payl_s = string(&text);
       
    61 	if(payl_s.size() < 24) return payl_s;
       
    62 	else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); }
       
    63 void PayloadS::sabotage() {
       
    64 	text = '?'; }
       
    65 int PayloadS::len() {
       
    66 	return sizeof(PayloadS) + strlen(&text); }
       
    67 int ContainerS::len() {
       
    68 	return hdr.len() + payl.len(); }
       
    69 DataC::DataC() {}
       
    70 DataC::DataC(DebugP callerDeP) {
       
    71    deP = new DebugC();
       
    72    DEBID("%s DATA", callerDeP->debid);
       
    73    dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size();
       
    74    contP = static_cast<ContainerP>(operator new(dataLen));
       
    75    new(&contP->hdr) HeaderS();
       
    76    new(&contP->payl) PayloadS();
       
    77    LOG(5, "Empty Data instance established");
       
    78 }
       
    79 void DataC::load(int ttl, const char *text) {
       
    80    new(&contP->hdr) HeaderS(ttl);
       
    81    new(&contP->payl) PayloadS(text);
       
    82    LOG(5, "Data instance loaded with payload");
       
    83    return;
       
    84 }
       
    85 string DataC::unld() {
       
    86 	return string(&(contP->payl.text)); }
       
    87 string DataC::digest() {
       
    88 	return contP->payl.digest(); }
       
    89 int DataC::dttl() {
       
    90    return --contP->hdr.ttl; }
       
    91 int DataC::ttl() {
       
    92 	return contP->hdr.ttl; }
       
    93 bool DataC::dataOk() {
       
    94    return csP->text == string(&(contP->payl.text)); }
       
    95 int DataC::ts() {
       
    96    return contP->hdr.ts; }
       
    97 int DataC::remPort(int remPort) {
       
    98    return (contP->hdr.remPort = remPort); }
       
    99 int DataC::remPort() {
       
   100    return contP->hdr.remPort; }
       
   101 NodeC::NodeC(ConstellationP co, int port) {
       
   102    deP = new DebugC();
       
   103    DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port);
       
   104    LOG(4, "intializing ...");
       
   105    data = DataC(deP);
       
   106    topo = co->topo;
       
   107    locPort = port;
       
   108    first = co->first;
       
   109    nodes = co->nodes;
       
   110    last = first + nodes - 1;
       
   111    cliSides = new SocketS[nodes];
       
   112    srvSides = new SocketS[nodes];
       
   113    for(int k=0; k<nodes; k++) 	cliSides[k].sc = srvSides[k].sc = -1;
       
   114    kicker = (locPort == first);
       
   115    forwP = co->forwP;
       
   116    closing = 0;
       
   117    ssl = co->ssl;
       
   118    ssc = 0;
       
   119    if(ssl) {
       
   120       char s[128];
       
   121       SSL_load_error_strings();
       
   122       SSL_library_init();
       
   123       LOG(4, "setting SSL contex...");
       
   124       if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX");
       
   125       SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY);
       
   126       SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);	// SSL will claim partner certificate
       
   127       sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort);
       
   128       LOG(5, "SSL private key used: %s", s);
       
   129       if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
       
   130       sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort);
       
   131       LOG(5, "SSL certificate used: %s", s);
       
   132       if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
       
   133       LOG(5, "SSL: CApath: %s", csP->caPath.c_str());
       
   134       if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path");
       
   135    }
       
   136    LOG(5, "initalized");
       
   137 }
       
   138 int NodeC::run() {
       
   139    LOG(5, "binding, kicker=%d", kicker);
       
   140    bindN();
       
   141    if(kicker) {
       
   142       data.load(csP->ttl, csP->text.c_str());
       
   143       int sci, next;
       
   144       next = next_node(); sci = next - first;
       
   145       LOG(2, "ready to initial send %s, len=%d to node %d",	data.digest().c_str(), data.dataLen, next);
       
   146       conn(sci, next);
       
   147       putN(sci);
       
   148    }
       
   149    mainLoop();
       
   150    LOG(5, "closing ssc");
       
   151    close(ssc);
       
   152    if(closing) closingThread.join();		// wait for closing thread
       
   153 	struct sigaction sigact;
       
   154 	sigfillset(&sigact.sa_mask);
       
   155 	sigact.sa_handler = sighandle;
       
   156 	if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
       
   157 	if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
       
   158 	if(--csP->shP->act == 0) {
       
   159 		if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
       
   160 		kill(0, SIGUSR2);
       
   161 	}
       
   162 	else {
       
   163 		if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
       
   164 		pause();
       
   165 	}
       
   166    int exitRc = EXIT_SUCCESS;
       
   167    if(kicker && !data.dataOk()) {
       
   168    	SOFTERR("input and output differ");
       
   169    	exitRc = EXIT_FAILURE;
       
   170    }
       
   171    LOG(2, "ended");
       
   172    exit(exitRc);
       
   173 }
       
   174 void NodeC::mainLoop() {
       
   175    fd_set rs;
       
   176    int nfds;
       
   177    struct timeval t = {1, 0};
       
   178    nfds = 0; FD_ZERO(&rs);
       
   179    if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
       
   180    while(nfds) {
       
   181    	t.tv_sec = csP->selTO;
       
   182 		int rc;
       
   183 		rc = select(nfds, &rs, NULL, NULL, &t);
       
   184 		if(rc < 0 && errno != EINTR) SYSERR("select");
       
   185 		if(rc > 0) {
       
   186 			if(FD_ISSET(ssc, &rs)) {
       
   187 				int i;
       
   188 				for(i=0; srvSides[i].sc > -1 && i < nodes; i++);						// find unused slot for accept
       
   189 				if(i == nodes) HARDERR("can't accept, all slots in use");
       
   190 				LOG(5, "slot for accept=%d", i);
       
   191 				acc(i);
       
   192 				forward(i);
       
   193 			}
       
   194 			else
       
   195 				for(int i = 0; i < nodes; i++) {
       
   196 					if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); }
       
   197       }
       
   198       FD_ZERO(&rs); nfds = 0; t.tv_sec = 1;
       
   199       for(int i = 0; i < nodes; i++) {
       
   200       	int sc = srvSides[i].sc;
       
   201       	if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; }
       
   202       }
       
   203       if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
       
   204    }
       
   205 }
       
   206 void NodeC::forward(int sci) {
       
   207    int next, scn;
       
   208    if(getN(sci)) {
       
   209    	LOG(5, "received data from %u", data.remPort());
       
   210       if(kicker) {
       
   211       	LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl());
       
   212       	if(data.dttl() <= 0) {
       
   213       		LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str());
       
   214       		closeClients();
       
   215       		*(forwP) = 0;
       
   216       		LOG(4, "leaving forward closing");
       
   217       		return;
       
   218       	}
       
   219       }
       
   220       next = next_node(); scn = next - first;
       
   221       LOG(5, "forwarding len=%d to %d --->", data.dataLen, next);
       
   222       if(cliSides[scn].sc < 0) conn(scn, next);
       
   223 		if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
       
   224       putN(scn);
       
   225       LOG(5, "forwarded to %u", next);
       
   226    }
       
   227    else {
       
   228       closeClients();
       
   229       closeSocket(sci, server);
       
   230    }
       
   231    return;
       
   232 }
       
   233 void NodeC::bindN() {
       
   234    LOG(4, "binding...");
       
   235    struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
       
   236    memset(sa, 0, sizeof(struct addrinfo));
       
   237    sa->ai_family = AF_INET;
       
   238    sa->ai_socktype = SOCK_STREAM;
       
   239    sa->ai_protocol = 0;
       
   240    sa->ai_flags = AI_PASSIVE;
       
   241    char s[64];
       
   242    sprintf(s, "%d", locPort);
       
   243    int e;
       
   244    if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
       
   245    GAI(4, sa);
       
   246    if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
       
   247    int opt = 1;
       
   248    if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
       
   249    if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
       
   250    if(listen(ssc, 1) < 0) SYSERR("listen");
       
   251    LOG(2, "bound to %d", locPort);
       
   252 }
       
   253 void NodeC::conn(int i, int remPort) {
       
   254    DebugP deP = new DebugC();
       
   255    DEBID("%s to %u", this->deP->debid, remPort);
       
   256    cliSides[i].remPort = remPort;
       
   257    int e;
       
   258    LOG(4, "connecting to %u...", remPort);
       
   259    int retry = csP->connThreshold;
       
   260    struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
       
   261    memset(ai, 0, sizeof(struct addrinfo));
       
   262    ai->ai_family = AF_INET;
       
   263    ai->ai_socktype = SOCK_STREAM;
       
   264    ai->ai_protocol = 0;
       
   265    ai->ai_flags = 0;
       
   266    char port[6];
       
   267    sprintf(port, "%d", remPort);
       
   268    if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
       
   269    GAI(3, ai);
       
   270    if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
       
   271    while(retry--) {
       
   272       if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
       
   273       	if(errno != ECONNREFUSED) SYSERR("connect");
       
   274       	usleep(csP->connTO); }
       
   275       else break; }
       
   276    if(retry < 1) {
       
   277       LOG(0, "connection refused threshold %d reached", csP->connThreshold);
       
   278       exit(EXIT_FAILURE); }
       
   279    if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
       
   280    csP->shP->conns++;
       
   281    if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
       
   282    socklen_t l = sizeof(struct sockaddr);
       
   283    struct sockaddr *sa = (sockaddr*)malloc(l);
       
   284    if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
       
   285    LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa);
       
   286    if(ssl) {
       
   287       ERR_clear_error();
       
   288       if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
       
   289       if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd");
       
   290       if((e = SSL_connect(cliSides[i].sslP)) < 1) {
       
   291       	switch(SSL_get_error(cliSides[i].sslP, e)) {
       
   292       	case SSL_ERROR_SYSCALL:
       
   293       		ssl_err((char*)"SSL connect", deP);
       
   294       		if(e == 0) LOG(0, "SSL connect: EOF on socket");
       
   295       		else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno);
       
   296       		abend(deP);
       
   297       		break;
       
   298 			default:
       
   299 				SSLERR("SSL connect");
       
   300 				break;
       
   301       	}
       
   302       }
       
   303    }
       
   304    LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1));
       
   305 }
       
   306 void NodeC::acc(int i) {
       
   307    LOG(4, "accepting...");
       
   308    if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept");
       
   309    socklen_t l = sizeof(struct sockaddr);
       
   310    struct sockaddr *sa = (sockaddr*)malloc(l);
       
   311    if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
       
   312    LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa);
       
   313    if(ssl) {
       
   314       int e;
       
   315       ERR_clear_error();
       
   316       if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
       
   317       if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd");
       
   318       if((e = SSL_accept(srvSides[i].sslP)) < 1) {
       
   319       	switch(SSL_get_error(srvSides[i].sslP, e)) {
       
   320       	case SSL_ERROR_SYSCALL:	SYSERR("SSL accept"); break;
       
   321       	default: 					SSLERR("SSL accept"); break;
       
   322       	}
       
   323       }
       
   324    }
       
   325    LOG(2, "accepted");
       
   326 }
       
   327 void NodeC::closeSocket(int i, nodeside side) {
       
   328    SocketP sc;
       
   329    if(side) sc = srvSides; else sc = cliSides;
       
   330    LOG(5, "closing sc=%d...", sc[i].sc);
       
   331    if(ssl) {
       
   332    	int e;
       
   333    	if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
       
   334    	if(!e) {
       
   335    		LOG(5, "SSL shutdown rc=0");
       
   336    		if((e = SSL_shutdown(sc[i].sslP)) < 0) {
       
   337    			switch(SSL_get_error(sc[i].sslP, e)) {
       
   338    			case SSL_ERROR_SYSCALL: {
       
   339    				long e;
       
   340    				if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)");
       
   341    				break;
       
   342    			}
       
   343    			default:
       
   344    				SSLERR("SSL shutdown (2)");
       
   345    				break;
       
   346    			}
       
   347    		}
       
   348    	}
       
   349    }
       
   350    close(sc[i].sc);
       
   351    LOG(4, "closed sc=%d", sc[i].sc);
       
   352    sc[i].sc = -1;
       
   353 }
       
   354 void closeCliTh(void *p) {
       
   355    NodeP nP = (NodeP)p;
       
   356    char *callerid = nP->deP->debid;
       
   357    DebugP deP = new DebugC();
       
   358    DEBID("%s CLOSE clients thread", callerid);
       
   359    LOG(5, "start...");
       
   360    for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client);
       
   361    LOG(4, "all clients closed");
       
   362 }
       
   363 void NodeC::closeClients() {
       
   364    if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) {
       
   365    		cout << "closing tread: " << e.what() << '\n'; }
       
   366    closing = 1;
       
   367 }
       
   368 int NodeC::readN(int i) {
       
   369    LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc);
       
   370    int n, rest = data.dataLen;
       
   371    char *buf = (char*)data.contP;
       
   372    while(rest > 0) {
       
   373    	if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
       
   374    	else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
       
   375    	if(n == 0) {
       
   376    		LOG(4, "read EOF");
       
   377    		return 0;
       
   378    	}
       
   379    	else {
       
   380    		buf += n; rest -= n;
       
   381    		if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf);
       
   382    	}
       
   383    }
       
   384 	if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
       
   385 	csP->shP->msgs++;
       
   386 	if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
       
   387 	LOG(5, "read %d from %u", data.dataLen, data.remPort());
       
   388 	return (data.dataLen);
       
   389 }
       
   390 int NodeC::writeN(int i) {
       
   391    LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc);
       
   392    int n, rest = data.dataLen;
       
   393    char *buf = (char*)data.contP;
       
   394    while(rest > 0) {
       
   395 		if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); }
       
   396 		else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); }
       
   397 		buf += n; rest -= n;
       
   398 		if(rest > 0) LOG(5, "partly written %d bytes", n);
       
   399 	}
       
   400    LOG(5, "written %d", data.dataLen);
       
   401    return data.dataLen;
       
   402 }
       
   403 int NodeC::getN(int i) { return readN(i) > 0; }
       
   404 int NodeC::putN(int i) {
       
   405 	data.remPort(locPort);
       
   406 	return writeN(i) > 0; }
       
   407 int NodeC::next_node() {
       
   408    int next;
       
   409    if(topo == ring) {
       
   410       next = locPort + 1;
       
   411       if(next > last) next = first;
       
   412    }
       
   413    else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort);
       
   414    return next;
       
   415 }
       
   416 ConstellationC::ConstellationC() {
       
   417    deP = NULL;
       
   418    forwP = NULL;
       
   419    topo = ring;
       
   420    ssl = first = nodes = 0;
       
   421 };
       
   422 ConstellationC::ConstellationC(topology topo, int ssl) {
       
   423    deP = new DebugC();
       
   424    DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
       
   425    this->topo = topo;
       
   426    if(topo == ring) { first=csP->rp0; nodes=csP->rn; }
       
   427    if(topo == mash) { first=csP->mp0; nodes=csP->mn; }
       
   428    this->ssl = ssl;
       
   429    first += ssl*500;
       
   430    forwP = NULL;
       
   431 }
       
   432 int ConstellationC::run() {
       
   433 	int stat = 0, pid = 0, exitRc = EXIT_SUCCESS;
       
   434    if(nodes == 0) exit(0);
       
   435    if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
       
   436 	pid_t *pids = new pid_t[nodes];
       
   437    LOG(1, "%d nodes starting...", nodes);
       
   438    if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
       
   439    *forwP = 1;
       
   440    for(int port = first; port < first + nodes; port++) {
       
   441       if(!(pid = fork())) (new NodeC(this, port))->run();
       
   442       else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); }
       
   443    }
       
   444    LOG(2, "all nodes established");
       
   445    while((pid = wait(&stat)) > 0)
       
   446      if(WIFEXITED(stat)) {
       
   447    	  LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
       
   448    	  if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
       
   449      }
       
   450      else exitRc = EXIT_FAILURE;
       
   451    LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR");
       
   452    exit(exitRc);
       
   453 }
       
   454 CSS::CSS(DebugP deP, char *prgnP) {
       
   455    ShareA(shP);
       
   456    shP->msgs = 0;
       
   457    shP->conns = 0;
       
   458    if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
       
   459    text = "bla bla";
       
   460    ttl = 3;
       
   461    rp0 = 11000;
       
   462    mp0 = 12000;
       
   463    rn = 0;
       
   464    mn = 0;
       
   465    issl = 0;
       
   466    const string sslPathSuffP = "../CS";
       
   467    caPath = "/home/local/etc/ssl/certs/";
       
   468    connThreshold = 77;			// connection retries threshold
       
   469    connTO = 0.01 * 1000*1000;	// connection sleep time in usecs
       
   470    selTO = 1;						// selection timeout in secs
       
   471 
       
   472    if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB");
       
   473    if(getenv("T") != NULL) { text = getenv("T"); }
       
   474    if(getenv("CEP") != NULL) cePath = getenv("CEP");
       
   475    else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; }
       
   476    if(getenv("CAP") != NULL)  caPath = getenv("CAP");
       
   477    if(getArg("TTL") > 0) ttl = getArg("TTL");
       
   478    if(getArg("RP0") > 0) rp0 = getArg("RP0");
       
   479    if(getArg("MP0") > 0) mp0 = getArg("MP0");
       
   480    if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; }
       
   481    if(getArg("SSL") >= 0) issl = getArg("SSL");
       
   482    if(getArg("RN") >= 0) rn = getArg("RN");
       
   483    if(getArg("MN") >= 0) mn = getArg("MN");
       
   484 	shP->act = rn + mn;		// initialize active node processes counter
       
   485 	if(issl > 1) shP->act += shP->act;
       
   486 	pacing = 0;
       
   487    if(getenv("P") != NULL) {
       
   488 	  	double d = atof(getenv("P"));
       
   489 	  	pace.tv_sec=(time_t)trunc(d);
       
   490 	  	pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000;
       
   491 	  	if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1;
       
   492    }
       
   493    if(getArg("RS") >= 0) srandom(getArg("RS"));
       
   494 }
       
   495 int main(int argc, char *argv[]) {
       
   496    DebugC::debug_init(argv[0]);
       
   497    DebugP deP = new DebugC();
       
   498    DEBID("client/server demo");
       
   499    csP = new CSS(deP, argv[0]);
       
   500    LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
       
   501    			argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug);
       
   502    if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str());
       
   503    if(csP->issl < 2) {
       
   504       if(!fork()) (new ConstellationC(ring, csP->issl))->run();
       
   505       if(!fork()) (new ConstellationC(mash, csP->issl))->run();
       
   506    } else for(int  ssl = 0; ssl < csP->issl; ssl++) {
       
   507       if(!fork()) (new ConstellationC(ring, ssl))->run();
       
   508       if(!fork()) (new ConstellationC(mash, ssl))->run();
       
   509    }
       
   510    int stat, exitRc = EXIT_SUCCESS;
       
   511    while(wait(&stat) > 0)
       
   512    	if(WIFEXITED(stat)) {
       
   513    		LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat));
       
   514    		if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
       
   515    	}
       
   516       else exitRc = EXIT_FAILURE;
       
   517    LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns);
       
   518    exit(exitRc);
       
   519 }