CSc/CS.c
changeset 0 5c129dd80d4f
equal deleted inserted replaced
-1:000000000000 0:5c129dd80d4f
       
     1 #include "CS.h"
       
     2 
       
     3 CSP csP;
       
     4 DebugP deP;
       
     5 
       
     6 void abend(DebugP deP) {
       
     7 //	LOG(0, "ABORT, netstat:");
       
     8 //	int pid;
       
     9 //	if(!(pid = fork())) {
       
    10 //		execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
       
    11 //		exit(EXIT_SUCCESS);
       
    12 //	}
       
    13 //	waitpid(pid, NULL, 0);
       
    14 	LOG(0, "ABORT, backtrace:");
       
    15 	back_trace();
       
    16 	kill(0, SIGTERM);
       
    17 	exit(EXIT_FAILURE);
       
    18 }
       
    19 char *gpa(struct sockaddr *ai_addr) {				// returns string with IP4 address & port assigned to the socket
       
    20 	char *s = malloc(64);
       
    21 	unsigned short port = *(unsigned short*) ai_addr->sa_data;
       
    22 	char *a = ai_addr->sa_data + 2;
       
    23 	sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
       
    24 	return s;
       
    25 }
       
    26 void gai(int level, struct addrinfo *ai, DebugP pr) {	// logs assigned IP4 addresses from addrinfo chain
       
    27 	struct addrinfo *sa = ai;
       
    28 	if (csP->debMaxLev >= level) do {
       
    29 		char *s = gpa(sa->ai_addr);
       
    30 		strcpy(pr->msg, s); free(s);
       
    31 		deb(level, pr);
       
    32 	} while ((sa = sa->ai_next));
       
    33 	fflush(stderr);
       
    34 }
       
    35 void ssl_err(DebugP pr, char *s) {
       
    36 	long e;
       
    37 	e = ERR_get_error();
       
    38 	LOG(4, "ssl err, e=%lu", e);
       
    39 	while(e) {
       
    40 		LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
       
    41 		e = ERR_get_error();
       
    42 		LOG(4, "ssl err, e=%lu", e);
       
    43 	}
       
    44 	abend(pr);
       
    45 }
       
    46 static int getArg(char *a) {
       
    47 	return (getenv(a) != NULL) ? atoi(getenv(a)) : 0;
       
    48 }
       
    49 static void constellation(topology topo, int ssl) {
       
    50 	DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
       
    51 	int first, nodes, *forw, pid, stat, exitRc = EXIT_SUCCESS;
       
    52 	pid_t *pids;
       
    53 	if(topo==ring) { first=csP->rp0; nodes=csP->rn; }
       
    54 	if(topo==mash) { first=csP->mp0; nodes=csP->mn; }
       
    55 	if(nodes == 0) exit(0);
       
    56 	if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
       
    57 	LOG(1, "%d node(s) starting...", nodes);
       
    58 	pids = malloc(nodes*sizeof(int));
       
    59 	first += ssl*500;
       
    60 	if((forw = mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
       
    61 	*forw = 1;
       
    62 	for(int i = 0; i < nodes; i++) {
       
    63 		if(!(pid = fork())) Node(topo, forw, first+i, first, nodes, ssl);
       
    64 		else { LOG(3, "node %u established in process %u", first+i, pid); pids[i] = (pid_t)pid; }	   
       
    65 	}
       
    66 	LOG(2, "all nodes established");
       
    67 	while((pid = wait(&stat)) > 0) {
       
    68 		if(WIFEXITED(stat)) {
       
    69 			LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
       
    70 			if(WEXITSTATUS(stat) != EXIT_SUCCESS) exitRc = EXIT_FAILURE;
       
    71 		}
       
    72 		else { LOG(4, "node process %u crashed", pid); exitRc = EXIT_FAILURE; }
       
    73 	}
       
    74 	LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "BADLY");
       
    75 	exit(exitRc);
       
    76 }
       
    77 int main(int argc, char *argv[]) {
       
    78 	CST cs;
       
    79 	csP = &cs;
       
    80 	deP = &(csP->debug);
       
    81 	ShareA(csP->shP);
       
    82 	struct sigaction ign;
       
    83 
       
    84 	if(sem_init(&(csP->shP->debugSem), 1, 1) < 0) ERR("LOG sem_init");
       
    85 	debug_init(argv[0]);
       
    86 	DEBID("client/server demo");
       
    87 	if(getArg("DEB") >= 0) csP->debMaxLev = getArg("DEB");
       
    88 
       
    89 	if(sem_init(&csP->shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
       
    90 	ign.sa_flags = 0;
       
    91 	ign.sa_handler = SIG_IGN;
       
    92 	if(sigaction(SIGUSR2, &ign, NULL)<0) SYSERR("sigaction");
       
    93 
       
    94 	csP->shP->msgs = 0;
       
    95 	csP->shP->conns = 0;
       
    96 	csP->text="bla bla";
       
    97 	csP->ttl = 3;
       
    98 	csP->rp0 = 11000;
       
    99 	csP->mp0 = 12000;
       
   100 	csP->rn = 0;
       
   101 	csP->mn = 0;
       
   102 	csP->pace.tv_sec = csP->pace.tv_nsec = 0;
       
   103 	csP->connThreshold = 77;			// connection retries threshold
       
   104 	csP->connTO = 0.01 * 1000*1000;	// connection timeout in usecs
       
   105 	csP->selTO = 1;						// selection timeout in secs
       
   106 	csP->issl = 0;
       
   107 	csP->caP = (char*)"/home/local/etc/ssl/certs/";
       
   108 	char *sslPathSuffP = "/../CS/";
       
   109 
       
   110 	if(getenv("T") != NULL) csP->text = getenv("T");
       
   111 	if(getenv("CEP") != NULL) {
       
   112 		csP->ceP = (char*)malloc(strlen(getenv("CEP")));
       
   113 		strcpy(csP->ceP, getenv("CEP")); }
       
   114 	else {csP->ceP = (char*)malloc(strlen(dirname(argv[0])) + strlen(sslPathSuffP) + 1);
       
   115 		strcpy(csP->ceP, dirname(argv[0]));
       
   116 		strcpy(csP->ceP + strlen(dirname(argv[0])), sslPathSuffP); }
       
   117 	if(getenv("CAP") != NULL)  {
       
   118 		csP->caP = (char*)malloc(strlen(getenv("CAP")) + 1);
       
   119 		strcpy(csP->caP, getenv("CAP"));	}
       
   120 	if(getArg("TTL") > 0) csP->ttl = getArg("TTL");
       
   121 	if(getArg("RP0") > 0) csP->rp0 = getArg("RP0");
       
   122 	if(getArg("MP0") > 0) csP->mp0 = getArg("MP0");
       
   123 	if(getArg("N") >= 0) {
       
   124 		csP->mn = getArg("N");
       
   125 		csP->rn = csP->mn; }
       
   126 	if(getArg("SSL") >= 0) csP->issl = getArg("SSL");
       
   127 	if(getArg("RN") >= 0) csP->rn = getArg("RN");
       
   128 	if(getArg("MN") >= 0) csP->mn = getArg("MN");
       
   129 	csP->shP->act = csP->rn + csP->mn;		// initialize active node processes counter
       
   130 	if(csP->issl > 1) csP->shP->act += csP->shP->act;
       
   131 	csP->pacing = 0;
       
   132    if(getenv("P") != NULL) {
       
   133 	  	double d = atof(getenv("P"));
       
   134 	  	csP->pace.tv_sec=(time_t)trunc(d);
       
   135 	  	csP->pace.tv_nsec=(d-csP->pace.tv_sec)*1000*1000*1000;
       
   136 	  	if(csP->pace.tv_sec > 0 || csP->pace.tv_nsec > 0) csP->pacing = 1;
       
   137    }
       
   138 	if(getArg("RS") >= 0) srandom(getArg("RS"));
       
   139 //	LOG(5, "initialized: ssl=%u, ssl path: %s, CA path: %s, pace=%f",
       
   140 //				csP->issl, csP->ceP, csP->caP, (double)csP->pace.tv_sec+(double)csP->pace.tv_nsec/(1000*1000*1000));
       
   141    LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
       
   142    			argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, csP->debMaxLev);
       
   143    if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->ceP, csP->caP);
       
   144 
       
   145 	if(csP->issl < 2) {		// 0 = no ssl, 1 = with SSL, 2 = both
       
   146 		if(!fork()) constellation(ring, csP->issl);
       
   147 		if(!fork()) constellation(mash, csP->issl);
       
   148 	} else for(int i = 0; i < 2; i++) {
       
   149 		if(!fork()) constellation(ring, i);
       
   150 		if(!fork()) constellation(mash, i);
       
   151 	}
       
   152 
       
   153 	int stat, pid;
       
   154 	while((pid = wait(&stat)) > 0) if(WIFEXITED(stat)) LOG(4, "constellation process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
       
   155 
       
   156 	LOG(1, "final balance: forwards=%d, connections=%d", csP->shP->msgs, csP->shP->conns);
       
   157 	return 0;
       
   158 }