# HG changeset patch # User hh # Date 1574344510 -3600 # Node ID 5c129dd80d4f885237e75c6be32b53233d08bd09 -- diff -r 000000000000 -r 5c129dd80d4f CS/CS.cfg --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CS/CS.cfg Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,335 @@ +# +# OpenSSL example configuration file. +# This is mostly being used for generation of certificate requests. +# + +# This definition stops the following lines choking if HOME isn't +# defined. +HOME = . +RANDFILE = $ENV::HOME/.rnd + +# Extra OBJECT IDENTIFIER info: +#oid_file = $ENV::HOME/.oid +oid_section = new_oids + +# To use this configuration file with the "-extfile" option of the +# "openssl x509" utility, name here the section containing the +# X.509v3 extensions to use: +# extensions = +# (Alternatively, use a configuration file that has only +# X.509v3 extensions in its main [= default] section.) + +[ new_oids ] + +# We can add new OIDs in here for use by 'ca' and 'req'. +# Add a simple OID like this: +# testoid1=1.2.3.4 +# Or use config file substitution like this: +# testoid2=${testoid1}.5.6 + +#################################################################### +[ ca ] +default_ca = CA_default # The default ca section + +#################################################################### +[ CA_default ] + +dir = /home/local/etc/ssl # Where everything is kept +certs = $dir/certs # Where the issued certs are kept +crl_dir = $dir/crl # Where the issued crl are kept +database = $dir/index.txt # database index file. +#unique_subject = no # Set to 'no' to allow creation of + # several ctificates with same subject. +new_certs_dir = $dir/newcerts # default place for new certs. + +certificate = $dir/certs/hh_ca.crt # The CA certificate +serial = $dir/serial # The current serial number +crlnumber = $dir/crlnumber # the current crl number + # must be commented out to leave a V1 CRL +crl = $dir/crl.pem # The current CRL +private_key = $dir/private/hh_ca.key # The private key +RANDFILE = $dir/private/.rand # private random number file + +x509_extensions = usr_cert # The extentions to add to the cert + +# Comment out the following two lines for the "traditional" +# (and highly broken) format. +name_opt = ca_default # Subject Name options +cert_opt = ca_default # Certificate field options + +# Extension copying option: use with caution. +# copy_extensions = copy + +# Extensions to add to a CRL. Note: Netscape communicator chokes on V2 CRLs +# so this is commented out by default to leave a V1 CRL. +# crlnumber must also be commented out to leave a V1 CRL. +crl_extensions = crl_ext + +default_days = 365 # how long to certify for +default_crl_days= 30 # how long before next CRL +default_md = sha1 # which md to use. +preserve = no # keep passed DN ordering + +# A few difference way of specifying how similar the request should look +# For type CA, the listed attributes must be the same, and the optional +# and supplied fields are just that :-) +policy = policy_anything + +#################################################################### +# For the CA policy +[ policy_match ] +countryName = match +stateOrProvinceName = match +organizationName = match +organizationalUnitName = optional +commonName = supplied +emailAddress = optional + +#################################################################### +# For the 'anything' policy +# At this point in time, you must list all acceptable 'object' +# types. +[ policy_anything ] +countryName = optional +stateOrProvinceName = optional +localityName = optional +organizationName = optional +organizationalUnitName = optional +commonName = supplied +emailAddress = optional + +#################################################################### +[ req ] +default_bits = 2048 +default_keyfile = privkey.pem +distinguished_name = req_distinguished_name +attributes = req_attributes +x509_extensions = v3_ca # The extentions to add to the self signed cert + +# Passwords for private keys if not present they will be prompted for +# input_password = secret +# output_password = secret + +# This sets a mask for permitted string types. There are several options. +# default: PrintableString, T61String, BMPString. +# pkix : PrintableString, BMPString. +# utf8only: only UTF8Strings. +# nombstr : PrintableString, T61String (no BMPStrings or UTF8Strings). +# MASK:XXXX a literal mask value. +# WARNING: current versions of Netscape crash on BMPStrings or UTF8Strings +# so use this option with caution! +string_mask = nombstr +prompt = no + +#################################################################### +# req_extensions = v3_req # The extensions to add to a certificate request +[ req_distinguished_name ] +countryName = CZ +stateOrProvinceName = -- +localityName = Praha +0.organizationName = H.H. +organizationalUnitName = -- +commonName = $ENV::CN +emailAddress = hh@hh.cz + +# SET-ex3 = SET extension number 3 + +[ req_attributes ] +#challengePassword = A challenge password +#challengePassword_min = 4 +#challengePassword_max = 20 + +#unstructuredName = An optional company name + +#################################################################### +[ usr_cert ] +# These extensions are added when 'ca' signs a request. + +# This goes against PKIX guidelines but some CAs do it and some software +# requires this to avoid interpreting an end user certificate as a CA. +basicConstraints=CA:FALSE + +# Here are some examples of the usage of nsCertType. If it is omitted +# the certificate can be used for anything *except* object signing. +# This is OK for an SSL server. +# nsCertType = server +# For an object signing certificate this would be used. +# nsCertType = objsign +# For normal client use this is typical +# nsCertType = client, email +# and for everything including object signing: +# nsCertType = client, email, objsign + +# This is typical in keyUsage for a client certificate. +keyUsage = nonRepudiation, digitalSignature, keyEncipherment + +# This will be displayed in Netscape's comment listbox. +nsComment = "hh_ca - OpenSSL Generated Certificate" + +# PKIX recommendations harmless if included in all certificates. +subjectKeyIdentifier=hash +#authorityKeyIdentifier=keyid:always,issuer:always + +# This stuff is for subjectAltName and issuerAltname. +# Import the email address. +subjectAltName=email:copy +# An alternative to produce certificates that aren't +# deprecated according to PKIX. +# subjectAltName=email:move + +# Copy subject details +#issuerAltName=issuer:copy + +nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsBaseUrl +nsRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsRenewalUrl +#nsCaPolicyUrl +#nsSslServerName + +#################################################################### +[ srv_cert ] +# These extensions are added when 'ca' signs a request. + +# This goes against PKIX guidelines but some CAs do it and some software +# requires this to avoid interpreting an end user certificate as a CA. +basicConstraints=CA:FALSE + +# Here are some examples of the usage of nsCertType. If it is omitted +# the certificate can be used for anything *except* object signing. +# This is OK for an SSL server. +nsCertType = server +# For an object signing certificate this would be used. +# nsCertType = objsign +# For normal client use this is typical +# nsCertType = client, email +# and for everything including object signing: +# nsCertType = client, email, objsign + +# This is typical in keyUsage for a client certificate. +keyUsage = nonRepudiation, digitalSignature, keyEncipherment + +# This will be displayed in Netscape's comment listbox. +nsComment = "hh_ca - OpenSSL Generated Certificate" + +# PKIX recommendations harmless if included in all certificates. +subjectKeyIdentifier=hash +#authorityKeyIdentifier=keyid:always,issuer:always + +# This stuff is for subjectAltName and issuerAltname. +# Import the email address. +subjectAltName=email:copy +# An alternative to produce certificates that aren't +# deprecated according to PKIX. +# subjectAltName=email:move + +# Copy subject details +#issuerAltName=issuer:copy + +nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsBaseUrl +nsRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsRenewalUrl +#nsCaPolicyUrl +#nsSslServerName + +#################################################################### +[ v3_req ] +# Extensions to add to a certificate request + +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment + +#################################################################### +[ v3_ca ] +# Extensions for a typical CA + +# PKIX recommendation. +subjectKeyIdentifier=hash +#authorityKeyIdentifier=keyid:always,issuer:always + +# This is what PKIX recommends but some broken software chokes on critical +# extensions. +#basicConstraints = critical,CA:true +# So we do this instead. +basicConstraints = CA:true + +# Key usage: this is typical for a CA certificate. However since it will +# prevent it being used as an test self-signed certificate it is best +# left out by default. +# keyUsage = cRLSign, keyCertSign + +# Some might want this also +nsCertType = sslCA, emailCA + +# Include email address in subject alt name: another PKIX recommendation +subjectAltName=email:copy +# Copy issuer details +#issuerAltName=issuer:copy + +# DER hex encoding of an extension: beware experts only! +# obj=DER:02:03 +# Where 'obj' is a standard or added object +# You can even override a supported extension: +# basicConstraints= critical, DER:30:03:01:01:FF + +#################################################################### +[ crl_ext ] +# CRL extensions. +# Only issuerAltName and authorityKeyIdentifier make any sense in a CRL. + +# issuerAltName=issuer:copy +authorityKeyIdentifier=keyid:always,issuer:always + +#################################################################### +[ proxy_cert_ext ] +# These extensions should be added when creating a proxy certificate + +# This goes against PKIX guidelines but some CAs do it and some software +# requires this to avoid interpreting an end user certificate as a CA. +basicConstraints=CA:FALSE + +# Here are some examples of the usage of nsCertType. If it is omitted +# the certificate can be used for anything *except* object signing. + +# This is OK for an SSL server. +# nsCertType = server + +# For an object signing certificate this would be used. +# nsCertType = objsign + +# For normal client use this is typical +# nsCertType = client, email + +# and for everything including object signing: +# nsCertType = client, email, objsign + +# This is typical in keyUsage for a client certificate. +# keyUsage = nonRepudiation, digitalSignature, keyEncipherment + +# This will be displayed in Netscape's comment listbox. +nsComment = "hh_ca - OpenSSL Generated Certificate" + +# PKIX recommendations harmless if included in all certificates. +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid,issuer:always + +# This stuff is for subjectAltName and issuerAltname. +# Import the email address. +subjectAltName=email:copy +# An alternative to produce certificates that aren't +# deprecated according to PKIX. +# subjectAltName=email:move + +# Copy subject details +issuerAltName=issuer:copy + +nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsBaseUrl +nsRevocationUrl = http://www.hh.cz/ca-crl.pem +#nsRenewalUrl +#nsCaPolicyUrl +#nsSslServerName + +# This really needs to be in place for it to be a proxy certificate. +proxyCertInfo=critical,language:id-ppl-anyLanguage,pathlen:3,policy:foo diff -r 000000000000 -r 5c129dd80d4f CS/certs.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CS/certs.sh Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,7 @@ +#!/bin/bash +read -sp "CA master pw: " PW +for i in `seq 11500 11599`; do + for j in 0 1000; do + echo "$PW" | CN=$i openssl ca -config CS.cfg -in req/$((i+j)).req -out certs/$((i+j)).pem -batch -passin stdin || break + done +done diff -r 000000000000 -r 5c129dd80d4f CS/keys_reqs.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CS/keys_reqs.sh Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,7 @@ +#!/bin/bash +for i in `seq 11600 11699`; do + for j in 0 1000 2000; do + # PROMPT=no DN=auto CN=$((i+j)) openssl req -config /home/local/etc/ssl/hh_ca.cfg -new -out req/$((i+j)).req -keyout keys/$((i+j)).key -nodes + CN=$((i+j)) openssl req -config /home/local/etc/ssl/hh_ca.cfg -new -out req/$((i+j)).req -keyout keys/$((i+j)).key -nodes + done +done diff -r 000000000000 -r 5c129dd80d4f CSa32/CS.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/CS.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,556 @@ + .include "DS.S" +#----------------------------------------------- +# S T A T I C V A R I A B L E S +#----------------------------------------------- + .data + .global C.csP +C.csP: .int 0 # -->CS + .text +#----------------------------------------------- +# M A I N R O U T I N E +#----------------------------------------------- +# takes values from ENV, establishes task for comm nodes and waits for their completion + ARGS + DS prgP # -->progname +# returns nothing + PROLOC + DL this, CSL # CS - top level attr vector + DL deP # -->Debug + DL status # status returned from wait + DL pid # PID returned from wait + DL bad # cummulative subtask rc + DL s, 256 # string buf + EPILOC +#----------------------------------------------- + .global _start +_start: + PROLOG + lea this(bp), b # -->CS + mov b, C.csP # save -->CS + lea C.debug(b), a # -->Debug + mov a, deP(bp) +# initialize static debug vars + push prgP(bp) # argv[0] -->prgname + call D.init # initialize static debug vars + push D.prgNameP +# indetify itself + DEBID "client/server demo" + movl $1, C.debMaxLev(b) # default debug level +# initialize top level ctrl values vector + ShareA # allocate shared structure + mov C.csP, b + mov a, C.shP(b) # save -->shared struct in vector + movl $0, S.msgs(a) # init msg cntr + movl $0, S.conns(a) # init connection cntr + push $1 # initial semaphore value + push $1 # semaphore shared between processes + lea S.counter_sem(a), c + push c +# call sem_init # initialize shared counters semaphore +# cmp $0, a +# jz 0f +# SYSERR "sem_init of shared counters semaphore" +#0: + SYS sem_init +# set default values + mov C.csP, b + movl $3, C.ttl(b) # default TTL + movl $11000, C.rp0(b) # default bind port for the 1. node in ring + movl $12000, C.mp0(b) # default bind port for the 1. node in mash + movl $0, C.rn(b) # default # of nodes in ring + movl $0, C.mn(b) # default # of nodes in mash + movl $0, C.ssl(b) # default ssl switch - 0=noSSL + movl $77, C.connTh(b) # conn retries threshold + mov $0f, a + mov a, C.txtP(b) # default payload text + jmp 1f +0: .ascii "bla bla\0" +1: +# get control values from ENV +# get max debug level + GETINTENV "DEB" + mov C.csP, b + mov a, C.debMaxLev(b) + cmp $-1, a # debug level -1 means search subroutines + jne 0f + call D.subr + jmp C.ret +0: +# get message payload text + push $0f + call getenv + cmp $0, a + jz 1f + mov C.csP, b + mov a, C.txtP(b) + jmp 1f +0: .asciz "T" +1: +# get TTL + GETINTENV "TTL" + jz 0f + mov a, C.ttl(b) +0: +# get # of nodes in each topology + GETINTENV "I" + jz 0f + mov a, C.rn(b) + mov a, C.mn(b) +0: +# get # of nodes in ring + GETINTENV "RN" + jz 0f + mov a, C.rn(b) +0: +# get # of nodes in mash + GETINTENV "MN" + jz 0f + mov a, C.mn(b) +0: + add C.rn(b), a + cmp $3, C.ssl(b) + jne 1f + add a, a # double # of nodes when both SSL and nonSSL +1: mov C.shP(b), c # -->shared counters + mov a, S.act(c) # save # of active nodes +# get first ring node's bind port# + GETINTENV "RP0" + jz 0f + mov a, C.rp0(b) +0: +# get first mash node's bind port# + GETINTENV "MP0" + jz 0f + mov a, C.mp0(b) +0: +# get random() seed + GETINTENV "RS" + jz 0f + push a + call srandom +0: +# get pacing interval (real num in seconds) + movl $0, C.pace.tv_sec(b) + movl $0, C.pace.tv_nsec(b) + movl $0, C.pacing(b) + push $0f + call getenv + jmp 1f +0: .asciz "P" +1: + test a, a # env P set ? + jz 3f # no + push a + call atof # convert to double + fstl (sp) # tempor save + mov C.csP, b + fisttpl C.pace.tv_sec(b) # truncated integral part = secs + fldl (sp) + fisubl C.pace.tv_sec(b) # fraction part + fimull 1f # * 10^9 = nanosecs + fistl C.pace.tv_nsec(b) + jmp 2f +0: .double 0 +1: .int 1000000000 # 10^9 +2: + cmp $0, C.pace.tv_sec(b) + jnz 0f + cmp $0, C.pace.tv_nsec(b) + jz 3f +0: movl $1, C.pacing(b) +3: +# get ssl switch value and save it as mask +# switch: 0 = noSSL, 1 = SSL, 2 = both +# mask: 01B=noSSL, 10B=SSL, 11B=both + GETINTENV "SSL" # returned zero means SSL=0 or SSL by default 0 + inc a # change switch to mask + mov a, C.ssl(b) + cmp $1, a + je C.cont # no SSL + cmp $2, a + je 0f # only SSL + mov C.shP(b), a # -->shared mem + shll $1, S.act(a) # double # of active nodes when running both SSL and nonSSL +# get SSL CA cert dir path +0: + push $0f + call getenv + jmp 1f +0: .asciz "CAP" +1: + test a, a + jnz 1f + mov $0f, a + jmp 1f +0: .asciz "/home/local/etc/ssl/certs/" +1: + mov C.csP, b + mov a, C.caPathP(b) # save -->SSL CA certs path +# get SSL dir path + push $0f + call getenv + jmp 1f +0: .asciz "CEP" +1: + test a, a + jz 0f # ceP not set, try to determine + mov C.csP, b + mov a, C.cePathP(b) # save -->SSL dir path + jmp 1f +# determine home path (needed to locate SSL keys & certificates) +0: + push prgP(bp) # -->first parm - progname w/ path + call dirname # get prog dirname + push a + call strlen # dirname length + lea 1f-0f(a), a # + suffix length + sub a, sp # allocate space for cePath + mov C.csP, b + mov sp, C.cePathP(b) # save -->SSL path + push prgP(bp) + call dirname # get home dirname + push a # -->home dirname + mov C.csP, b + push C.cePathP(b) # -->SSL path + call strcpy # copy dirname to SSL path + call strlen # end of dirname + push $0f # -->suffix + mov C.csP, b + mov C.cePathP(b), c + lea (c, a), a # -->end of dirname + push a + call strcpy # copy suffix to SSL path + jmp 1f +0: .asciz "/../CS/" +1: +# testing sandbox + mov C.csP, b + pushl C.debMaxLev(b) + cmp $9, C.debMaxLev(b) + jne C.cont + + LOG 9, "debug=%u, testing...", 1 + DebugA + mov a, deP(bp) + DEBID "TEST" + LOG 9, "progress" + + mov C.csP, b + mov C.shP(b), a + push S.act(a) + LOG 9, "shared act=%u" + + jmp C.ret +# normal execution +C.cont: + push C.debMaxLev(b) + push C.ssl(b) + push C.pace.tv_nsec(b) + push C.pace.tv_sec(b) + push C.ttl(b) + push C.rn(b) + push C.mn(b) + push D.prgNameP + LOG 1, "pgm=%s, mash nodes=%d, ring nodes=%d, ttl=%d, pacing=%ld.%09ld, SSL mask=0x%02x, debug=%u", 8 + testl $2, C.ssl(b) + jz 0f + push C.caPathP(b) + push C.cePathP(b) + LOG 1, "SSL path=%s, SSL CA path=%s" +0: +# create constellation processes RING/MASH, nonSSL/SSL + mov $0, c # iter counter + mov C.ssl(b), d # SSL mask (01b = noSSL, 10b = SSL, 11b = both) +# iterate on SSL variants +C.iterateOnSsl: + test $1, d # check lowest bit of mask + jz C.nextSsslVar # next SSL variant + pusha +# create RING + SYS fork + jnz 1f # parent + popa + pushl $Cn.ring # RING topology + push c # use iter ctr as SSL switch + call Constellation # create RING constellation +1: push a + LOG 5, "RING started in process %d" + lea 4(sp), sp +# create MASH + SYS fork + jnz 1f # parent + popa + pushl $Cn.mash # MASH topology + push c # use iter ctr as SSL switch + call Constellation # create MASH constellation +1: push a + LOG 5, "MASH started in process %d" + lea 4(sp), sp + + popa +C.nextSsslVar: + shr $1, d # shift to test next SSL bit + inc c # incr ctr + cmp $2, c + jl C.iterateOnSsl +# wait for constellation processes completion + LOG 5, "waiting for constellations to terminate" + movl $0, bad(bp) # clear cummulative rc +C.iterateOnWait: + lea status(bp), a + push a # -->return status of task + call wait + mov a, pid(bp) # save pid + cmp $-1, a # a task ended? + jne 0f # yes + call __errno_location + cmp $10, (a) # error == ECHILD ? + je C.ret # yes, no other subtasks + SYSERR "wait" +0: +# push status(bp) # status of task +# push a # pid +# LOG 5, "status returned from task %u: 0x%08x" + mov status(bp), d + test $0x7f, d + jnz 1f # task killed, ABEND + and $0xff00, d # task exited, extract rc + shr $8, d + or d, bad(bp) # accumulate rc + push d # task rc + push pid(bp) # task pid + LOG 5, "constellation task %u ended with exit(%d)" + jmp C.iterateOnWait # wait for other tasks +1: push d + push pid(bp) + LOG 5, "constellation task %u killed, status=0x%x", 2 +2: +# ABEND + LOG 0, "ABEND, kill all tasks" + pushl $15 # SIGTERM + pushl $0 # all tasks +# call kill + SYS kill + pushl $1 + call exit +# normal end +C.ret: movl C.csP, b # -->CS vector + movl C.shP(b), b # -->Share + pushl S.conns(b) # no. of connections made + pushl S.msgs(b) # no. of messages sent + LOG 1, "END, forwards=%d, connections=%d", 2 + push bad(bp) + call exit +#----------------------------------------------- +# C O N S T E L L A T I O N O P E R A T I O N S +#----------------------------------------------- + ARGS + DS ssl # ssl switch + DS topo # topology +# returns: nothing + PROLOC + DL this, ConstellationL # this Constellation instance + DL thisP # -->this Constellation + DL deP # -->Debug + DL last # last node# + DL pid # pid returned from wait + DL stat # stat returned from wait + DL bad # "some node BAD" exit indicator + DL catched # count of returned node tasks + DL killed # count of killed node tasks + EPILOC +#----------------------------------------------- + .global Constellation +Constellation: + PROLOG + lea this(bp), b # -->this Constellation + mov b, thisP(bp) # save -->this Constellation +# set debid + lea Cn.debug(b), a # -->Debug + mov a, deP(bp) # save -->Debug locally + mov C.csP, c # -->CS + cmp $Cn.ring, topo(bp) # ring topology ? + je 0f + mov C.mn(c), a # num. of nodes + mov a, Cn.nodes(b) + mov C.mp0(c), a # port # of fist node + mov a, Cn.first(b) + movl $Cn.mash, Cn.topo(b) + push $8f + jmp 1f +0: mov C.rn(c), a # num. of nodes + mov a, Cn.nodes(b) + mov C.rp0(c), a # port # of fist node + mov a, Cn.first(b) + movl $Cn.ring, Cn.topo(b) + push $7f +1: cmp $0, ssl(bp) # SSL ? + jz 2f # no + movl $1, Cn.ssl(b) + addl $500, Cn.first(b) # first SSL port # + push $6f + jmp 9f +2: movl $0, Cn.ssl(b) + push $5f + jmp 9f +5: .ascii "non\0" +6: .ascii "\0" +7: .ascii "RING\0" +8: .ascii "MASH\0" +9: DEBID "%sSSL %s", 2 +# check # of nodes + movL $0, bad(bp) + cmp $1, Cn.nodes(b) # num of nodes + jl Cn.ret # < 1 ? nothing to do + jg 0f + LOG 0, "1 node configuration not implemented yet" + jmp Cn.ret +0: LOG 5, "initializing..." +# determine divisor for random next node choise + mov $1, a + shl $31, a + not a # MAX_INT + xor d, d + divl Cn.nodes(b) + mov a, Cn.div(b) # save divisor (MAX_INT / nodes) +# allocate "forward" indicator shared by nodes in constellation + push $0 + push $-1 + push $0x21 # PROT_READ | PROT_WRITE + push $0x03 # MAP_SHARED | MAP_ANONYMOUS + push $4 + push $0 +# call mmap +# cmp $-1, a +# jne 0f +# SYSERR "mmap" +#0: mov thisP(bp), b + SYS mmap + mov a, Cn.forwP(b) # save -->forw + movl $1, (a) # enable forwarding + push Cn.nodes(b) + LOG 1, "%u nodes starting..." +# start processes for all nodes in constellation + mov Cn.first(b), d # first node# + mov d, c + add Cn.nodes(b), c # last node + 1 +Cn.iterateOnFork: + pusha +# call fork + SYS fork + cmp $0, a + jnz 1f # parent + popa + push d # node's port# + push b # -->Cnstlln + call Node +1: mov a, pid(bp) + popa + push pid(bp) # nodes's pid + push d # node's port# + LOG 4, "node %u established in process %u", 2 + inc d + cmp d, c # last node ? + jg Cn.iterateOnFork # no, continue forking +# wait for completion of node processes + LOG 2, "all nodes established, waiting for them to terminate..." + movl $0, bad(bp) # accumulated return status of node tasks + movl $0, killed(bp) # num. of killed node tasks + movl $0, catched(bp) # num. of returned node tasks + lea -12(sp), sp # prepare space for loop +Cn.iterateOnWait: + cmp $0, bad(bp) # constellation status still OK ? + je 1f # yes + mov Cn.forwP(b), a # -->forwarding switch + movl $0, (a) # disable forwarding between nodes +1: + lea stat(bp), c + mov c, (sp) # -->return status of task + call wait + mov thisP(bp), b + cmp $-1, a # normal return from wait? + jne 0f # yes + call __errno_location + cmp $10, (a) # error == ECHILD ? + je Cn.allFinished # yes, no subtasks + SYSERR "wait" +0: + incl catched(bp) + mov a, pid(bp) # save subtask's pid + mov a, (sp) + mov stat(bp), c # subtask return status + mov c, 4(sp) + test $0x7f, c # subtask ended by exit ? + jnz 2f # no, killed + and $0xff00, c # extract subtask rc + jz 1f # rc = 0 + movl $1, bad(bp) # non zero rc, turn on BAD switch +1: shr $8, c + mov c, 4(sp) # rc + mov a, (sp) # pid + LOG 4, "node process %u ended by exit(%d)" + jmp Cn.iterateOnWait # continue waiting for other subtasks +2: movl $1, bad(bp) # subtask killed, turn on BAD switch + incl killed(bp) # counter of killed + mov c, 4(sp) # status + mov a, (sp) # pid + LOG 4, "node process %u killed, status=0x%x" + jmp Cn.iterateOnWait # continue waiting for other subtasks + +# opers of all nodes finished +Cn.allFinished: + push killed(bp) + push catched(bp) + mov thisP(bp), b + push Cn.nodes(b) + cmp $0, bad(bp) # all nodes ended OK ? + je 0f # yes + push $7f + jmp 9f +0: push $8f + jmp 9f +7: .ascii "with ERROR\0" +8: .ascii "OK\0" +9: LOG 1, "ENDED %s, %u spawned, %u catched, %u killed" +Cn.ret: + push bad(bp) + call exit +#----------------------------------------------- +# G E T I N T V A L U E S F R O M E N V +#----------------------------------------------- + ARGS + DS key # -->env key string +# returns int value or 0 if not found + PROLOC + EPILOC +#----------------------------------------------- +C.getArg: + PROLOG + pushl key(bp) # -->env key string + call getenv # get value + test a, a + jz 0f # not found in ENV + push a + call atoi # convert to int +0: + EPILOG_R +#----------------------------------------------- +# A B N O R M A L E N D +#----------------------------------------------- + ARGS + DS deP # -->Debug + PROLOC + EPILOC +#----------------------------------------------- + .globl C.abend +C.abend: + PROLOG + LOG 0, "ABEND" + push $15 # SIGTERM + push $0 # kill all +# call kill + SYS kill + push $1 + call exit +#----------------------------------------------- + .end diff -r 000000000000 -r 5c129dd80d4f CSa32/Constellation.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Constellation.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,155 @@ + .include "DS.S" +#----------------------------------------------- +# C O N S T E L L A T I O N O P E R A T I O N S +#----------------------------------------------- + ARGS + DS ssl # ssl switch + DS topo # topology +# returns: nothing + PROLOC + DL this, ConstellationL # this Constellation instance + DL thisP # -->this Constellation + DL deP # -->Debug + DL len + DL pid + DL stat + DL bad # "some node BAD" exit indicator + EPILOC +#----------------------------------------------- + .global Constellation +Constellation: + PROLOG + lea this(bp), b # -->this Constellation + mov b, thisP(bp) # save -->this Constellation +# lea this, b # -->this inst +# set debid + lea Cn.debug(b), a # -->Debug + mov a, deP(bp) # save -->Debug locally + mov C.csP, c # -->CS + cmp $Cn.ring, topo(bp) # ring topology ? + je 0f + mov C.mn(c), a # num. of nodes + mov a, Cn.nodes(b) + mov C.mp0(c), a # port # of fist node + mov a, Cn.first(b) + movl $Cn.mash, Cn.topo(b) + push $8f + jmp 1f +0: mov C.rn(c), a # num. of nodes + mov a, Cn.nodes(b) + mov C.rp0(c), a # port # of fist node + mov a, Cn.first(b) + movl $Cn.ring, Cn.topo(b) + push $7f +1: cmp $0, ssl(bp) # SSL ? + jz 2f # no + movl $1, Cn.ssl(b) + addl $500, Cn.first(b) # first SSL port # + push $6f + jmp 9f +2: movl $0, Cn.ssl(b) + push $5f + jmp 9f +5: .ascii "non\0" +6: .ascii "\0" +7: .ascii "RING\0" +8: .ascii "MASH\0" +9: DEBID "%sSSL %s", 2 +# check # of nodes + movL $0, bad(bp) + cmp $1, Cn.nodes(b) # num of nodes + jl ConstellationR # < 1 ? nothing to do + jg 0f + LOG 0, "1 node configuration not implemented yet" + jmp ConstellationR +0: LOG 5, "initializing..." +# determine divisor for random next node choise + mov $1, a + shl $31, a + not a # MAX_INT + xor d, d + divl Cn.nodes(b) + mov a, Cn.div(b) # save divisor (MAX_INT / nodes) +# allocate "forward" indicator shared by nodes in constellation + push $0 + push $-1 + push $0x21 # PROT_READ | PROT_WRITE + push $0x03 # MAP_SHARED | MAP_ANONYMOUS + push $4 + push $0 + call mmap + cmp $-1, a + jne 0f + SYSERR "mmap" +0: mov thisP(bp), b + mov a, Cn.forwP(b) # save -->forw + movl $1, (a) # enable forwarding + mov C.csP, c # -->CS + push C.ttl(c) + push Cn.nodes(b) + LOG 1, "%d node(s), ttl=%d starting..." +# start processes for all nodes in constellation + mov Cn.first(b), d # first node# + mov d, c + add Cn.nodes(b), c # last node + 1 +0: pusha + call fork + cmp $0, a + jnz 1f # parent + popa + push d # node's port# + push b # -->Cnstlln + call Node +1: mov a, pid(bp) + popa + push pid(bp) # nodes's pid + push d # node's port# + LOG 3, "node %u established in process %u", 2 + inc d + cmp d, c # last node ? + jg 0b # no, continue forking +# wait for completion of node processes + LOG 5, "all nodes established, waiting for them to terminate..." + movl $0, bad(bp) +0: lea stat(bp), a + cmp $0, bad(bp) # status still OK ? + je 1f # yes + mov Cn.forwP(b), a # -->forward switch + movl $0, (a) # disable forwarding +1: mov a, (sp) # -->return status of task + call wait + mov thisP(bp), b + cmp $0, a # normal return from wait? + jl 3f # no, all subtasks finshed + mov a, pid(bp) # save subtask's pid + mov stat(bp), a + test $0x7f, a # subtask ended by exit ? + jnz 2f # no, killed + and $0xff00, a # extract subtask rc + jz 1f # rc = 0 + movl $1, bad(bp) # non zero rc, turn on BAD switch +1: push a # rc + push pid(bp) # pid + LOG 4, "node process %u ended with exit(%d)" + jmp 0b # continue waiting for other subtasks +2: movl $1, bad(bp) # subtask killed, turn on BAD switch + push pid(bp) + LOG 4, "node process %u killed" + jmp 0b # continue waiting for other subtasks + +# opers of all nodes finished +3: cmp $0, bad(bp) # all nodes ended OK ? + je 0f # yes + push $7f + jmp 9f +0: push $8f + jmp 9f +7: .ascii "with ERROR\0" +8: .ascii "OK\0" +9: LOG 1, "ENDED %s" +ConstellationR: + push bad(bp) + call exit +#----------------------------------------------- + .end + diff -r 000000000000 -r 5c129dd80d4f CSa32/DS.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/DS.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,514 @@ + .equ a, %eax + .equ b, %ebx + .equ c, %ecx + .equ d, %edx + .equ bp, %ebp + .equ sp, %esp + .equ di, %edi +#----------------------------------------------- +# M A C R O D E C L A R A T I O N S +#----------------------------------------------- +# subroutines list + .macro SUBRLIST + SUBR _start + SUBR D.init + SUBR D.setId + SUBR D.log + SUBR D.getTs + SUBR D.subr + SUBR Node + SUBR N.bind + SUBR N.conn + SUBR N.acc + SUBR N.closeSocket + SUBR N.read + SUBR N.write + SUBR N.get + SUBR N.put + SUBR N.sslErr + SUBR Data + SUBR Da.load + SUBR Da.chk + SUBR Da.unl + SUBR Da.dttl + SUBR Da.ttl + SUBR Da.getTs + .endm +#----------------------------------------------- +# push subroutine addr, prints its name and addr +# leave subr name and addr on stack + .macro SUBR subr:req + push $\subr + push $9f + push $8f + push stderr + call fprintf + jmp 7f +9: .asciz "\subr" +8: .asciz "%-12s\t%p\n" +7: lea 8(sp), sp + .endm +#----------------------------------------------- +# prepare to define subroutine args; use macro DS to define args + .macro ARGS + ac = 40 + .endm +#----------------------------------------------- +# prepare to define subroutine args; use macro DS to define args + .macro M_ARGS + ac = 40 + DS thisP + .endm +#----------------------------------------------- +# declare storage + .macro DS id:req, len=4 + \id = ac + ac = ac + \len + .endm +#----------------------------------------------- +# prepare to define subroutine local vars + .macro PROLOC + ac = 0 + .endm +#----------------------------------------------- +# prepare to define subroutine local vars + .macro M_LOCAL + ac = 0 + DL deP + .endm +#----------------------------------------------- +# declare local var + .macro DL id:req, len=4 + ac = ac + \len + \id = -ac + .endm +#----------------------------------------------- +# terminate subroutine local vars definition + .macro EPILOC + locL = ac + .endm +#----------------------------------------------- +# subroutine prolog +# expects length of local storage under the name "locL" + .macro PROLOG + pusha + enter $locL, $0 + .endm +#----------------------------------------------- +# subroutine prolog +# expects length of local storage under the name "locL" + .altmacro + .macro M_PROLOG m_prefix:req, m_name:req + .global \m_prefix\().\m_name +\m_prefix\().\m_name: + pusha + enter $ac, $0 + mov thisP(bp), b + lea \m_prefix\().debug(b), a + mov a, deP(bp) + .endm + .noaltmacro +#----------------------------------------------- +# subroutine epilog without return value + .macro EPILOG + leave + popa + ret + .endm +#----------------------------------------------- +# subroutine epilog with return value in EAX +# expects length of local storage under the name "locL" + .macro EPILOG_R + leave + mov a, 28(sp) + popa + ret +# mov -36(sp), a + .endm +#----------------------------------------------- +# ABEND at system routine err + .macro ERR causer:req + pushl $9f + call printf + pushl $1 + call exit +9: .asciz "\causer: %m\n" + .endm +#----------------------------------------------- +# ABEND at system routine err using debug structure +# expects deP(bp)-->Debug inst + .macro SYSERR causer:req + push $9f # -->format + push $0 # no. of values + push $0 # msg debug level + pushl deP(bp) # -->Debug instance + call D.log # print msg + call C.abend +9: .asciz "\causer: %m" + .endm +#----------------------------------------------- +# print SSL err msg queue and then abend +# -->Debug in deP(bp) is expected + .macro SSLERR msg:req + pushl $9f + pushl deP(bp) + call N.sslErr +9: .asciz "\msg" + .endm +#----------------------------------------------------------- + .macro SYS name:req + call \name + cmp $-1, a + jne 8f + SYSERR "\name" +8: mov thisP(bp), b + test a, a + .endm +#----------------------------------------------- +# get integer value from ENV + .macro GETINTENV key:req + pushl $9f + call C.getArg + lea 4(sp), sp + cmp $0, a + jmp 8f +9: .asciz "\key" +8: + .endm +#----------------------------------------------- +# set local debug ID +# supposed: local deP-->Debug +# expected on the stack: +# \argc values to be put in debug ID according to format +# \argc may be zero + .macro DEBID format:req, argc=0 + push $9f # -->format + push $\argc # no. of values + pushl deP(bp) # -->Debug instance + call D.setId # put debug ID to Debug inst + jmp 8f +9: .asciz "\format" +8: lea 12(sp), sp + .endm +#----------------------------------------------- +# debug output +# supposed: local deP-->Debug +# expected on the stack: +# \argc values to be printed by \format at debug level \level +# \argc may be zero +# all regs are preserved + .macro LOG level:req, format:req, argc=7 + push $9f # -->format + push $\argc # no. of values + push $\level # msg debug level + pushl deP(bp) # -->Debug instance + call D.log # print msg + jmp 8f +9: .asciz "\format" +8: lea 16(sp), sp + .endm +#----------------------------------------------- +# flat print +# sp, bp are preserved + .macro PR format:req + push $9f + push stderr + call fprintf + jmp 8f +9: .asciz ">>> \format\n" +8: lea 8(sp), sp + .endm +#----------------------------------------------- +# print ip, bp, sp before subroutine call +# to be placed directly before a call instruction +# expected deP(bp) as -->Debug +# all regs are preserved + .macro B_CALL level:req, label="BEFORE SUBR CALL " + push a + lea 4(sp), a + push a + push bp + pushl $7f+5 + LOG \level, "\label: ip: %p, bp: %p, sp: %p", 3 + lea 12(sp), sp + pop a + jmp 7f +7: + .endm +#----------------------------------------------- +# print ip, bp, sp saved in sburoutine's stack frame +# to be placed anywhere before return seq and after deP(bp) is set +# expected deP(bp) as -->Debug +# all regs are preserved + .macro B_RET level:req, label="BEFORE SUBR RETURN" + push a + lea 8(bp), a + push a + push (bp) + push 4(bp) + LOG \level, "\label: ip: %p, bp: %p, sp: %p", 3 + lea 12(sp), sp + pop a + jmp 7f +7: + .endm +#----------------------------------------------- +# print ip, bp, sp retained for return from sburoutine +# all regs are preserved + .macro A_CALL label="AFTER CALL" + pusha + lea 8(bp), a + push a + push (bp) + push 4(bp) + pushl $9f + call printf + lea 16(sp), sp + popa + jmp 8f +9: .asciz "\label: ip: %p, bp: %p, sp: %p\n" +8: + .endm +#----------------------------------------------- + .macro SLEEPER time:req, ident=">>>" + pusha + LOG 7, "\ident: sleeping \time" + pushl $\time + call sleep + lea 4(sp), sp + LOG 7, "\ident: woken up" + popa + .endm +#----------------------------------------------- +# A B S T R A C T D A T A D E C L A R A T I O N S +#----------------------------------------------- +# rSA reg save area + ac = 0 + DS retSA + DS regSA, 32 + DS bpSA + rSAL = ac +#----------------------------------------------- +# Timeval + ac = 0 + DS secs + DS usecs + timevalL = ac +#----------------------------------------------- +# AddrInfo IP family net addr block + ac = 0 + DS ai_flags + DS ai_family + DS ai_socktype + DS ai_protocol + DS ai_addrlen + DS ai_addrP # -->sockaddr + DS ai_canonnameP + DS ai_nextP + AddrInfoL = ac +#----------------------------------------------- + SIGUSR2 = 12 + SIG_IGN = 1 + SIG_UNBLOCK = 1 +# sigaction signal handler definition + ac = 0 + DS sa_handler + DS sa_mask, 128 + DS sa_flags + DS sa_unused + SigActionL = ac +#----------------------------------------------- +# SockAddr + ac = 0 + DS sa_family, 2 + DS sa_data, 14 + SockAddrL = ac +#----------------------------------------------- +# timeval struct + ac = 0 + DS Ti.secs + DS Ti.usecs + timevalL = ac +#----------------------------------------------- +# Debug debug info + ac = 0 + D.idL = 128 + DS D.id, D.idL # debug ID of process + D.msgL = 256 + DS D.msg, D.msgL # debug msg workspace + DebugL = ac + + .macro DebugA # returns -->Debug + push $DebugL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: movl $0, D.id(a) + .endm +#----------------------------------------------- +# CS top level attributes + ac = 0 + DS C.debug, DebugL + DS C.debMaxLev # max level of debug msgs to be printed + DS C.txtP # -->text to be sent in messages + DS C.ttl # TTL for circulating msgs + DS C.mp0 # TCP port of first mash node + DS C.mn # intended # of nodes in mash + DS C.rp0 # TCP port of first ring node + DS C.rn # intended # of nodes in ring + DS C.pace.tv_sec # timespec.tv_sec + DS C.pace.tv_nsec # timespec.tv_nsec + DS C.pacing # pacing indicator +# DS C.rs # random() seed + DS C.ssl # ssl mask: 01B=noSSL, 10B=SSL, 11B=both + DS C.connTh # connection retry threshhold + DS C.shP # -->shared counters + DS C.pathP # -->pathname to application home dir + DS C.cePathP # -->pathname to SSL dir + DS C.caPathP # -->pathname to SSL CA CERT dir + CSL = ac + + .macro CSA # returns -->CS + push $CSL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: + .endm +#----------------------------------------------- +# Share counters shared between procs or threads + ac = 0 + DS S.counter_sem, 16 # semaphore for counters + DS S.conns # overall connections counter both in ring and mash + DS S.msgs # overall forewards# both in ring and mash + DS S.act # active node processes counter + DS S.mash_open_client_count # no. of opened clients in mash + DS S.mash_open_SSL_client_count # no. of opened SSL clients in mash + ShareL = ac + + .macro ShareA # returns -->Share + push $0 + push $-1 + push $0x21 # PROT_READ | PROT_WRITE + push $0x03 # MAP_SHARED | MAP_ANONYMOUS + push $ShareL + push $0 + call mmap + cmp $-1, a + jne 8f + SYSERR "mmap" +8: + .endm +#----------------------------------------------- +# Data - container for data sent through connection topology +# Container Header + ac = 0 + DS H.ttl # msg TTL + DS H.ts # timestamp + DS H.lport # listening TCP port + HeaderL = ac +# Container Payload + ac = 0 +# Pa.loadL = 256 + DS Pa.ts # timestamp + DS Pa.text, 0 # load sent in msg + PayloadL = ac +# Container to be send + ac = 0 + DS Co.hdr, HeaderL # Header + DS Co.payl, PayloadL # Payload +# ContainerL = ac +# Data instantion + ac = 0 + DS Da.debug, DebugL # Debug + DS Da.contP # -->Container + DS Da.datalen # container length + DataL = ac + + .macro DataA # returns -->Data + push $DataL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: + .endm +#----------------------------------------------- +# Node general attributes of node + ac = 0 + DS N.debug, DebugL # debug info + DS N.cnstlnP # -->Constellation block + DS N.topo # constellation topology + DS N.locPort # TCP port node binds to + DS N.first # port # of first node in constellation + DS N.last # port # of last node in constellation + DS N.nodes # number of nodes + DS N.div # random node choise divisor (MAX_INT / nodes) + DS N.kicker # kicker indicator + DS N.forwP # -->shared forward indicator + DS N.closing # closing in progress indicator + DS N.ssc # server side socket + DS N.data, DataL # data block + DS N.dataP # -->data block + DS N.srvSideP # -->array of server side Sockets + DS N.cliSideP # -->array of client side Sockets + DS N.sockArrLen # socket array length + DS N.next # next node's port# + DS N.nfds # highest FD# in FD sets + DS N.rs, 128 # read FD set + DS N.es, 128 # exceptional FD set + DS N.t, timevalL # timeval for select + DS N.ssl # SSL switch: 0=noSSL, 1=SSL + DS N.ctxP # -->SSL context + DS N.pid # process ID + DS N.ptid # closing thread ID + NodeL = ac + + .macro NodeA # returns -->Node + push $NodeL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: + .endm +#----------------------------------------------- +# SocketInfo comm socket info + ac = 0 + DS So.remPort # TCP port on remote site + DS So.sc # comm socket# + DS So.sslP # -->SSL structure + SocketInfoL = ac + + .macro SocketA # returns -->Socket + push $SocketL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: + .endm +#----------------------------------------------- +# Constellation attributes of constellation of nodes (mash or ring) + ac = 0 + DS Cn.debug, DebugL # debug info + DS Cn.topo # Constellation topology + Cn.ring = 0 + Cn.mash = 1 + DS Cn.first + DS Cn.nodes + DS Cn.div # random node choise divisor (MAX_INT / nodes) + DS Cn.ssl + DS Cn.forwP + ConstellationL = ac + + .macro ConstellationA # returns -->Constellation + push $ConstellationL + call malloc + cmp $0, a + ja 8f + SYSERR "malloc" +8: + .endm diff -r 000000000000 -r 5c129dd80d4f CSa32/Data.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Data.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,270 @@ + .include "DS.S" + .text +#----------------------------------------------- +# C O N S T R U C T O R +#----------------------------------------------- + ARGS + DS thisP # -->Data inst + DS callerDeP # -->originator's Debug + PROLOC + DL deP # -->Debug + EPILOC +#----------------------------------------------- + .global Data +Data: + PROLOG + mov thisP(bp), b # -->Data inst + lea Da.debug(b), a # -->Debug + mov a, deP(bp) # init local -->Debug +# construct debug ID from originator debID + mov callerDeP(bp), a # -->caller's Debug + lea D.id(a), a + push a # -->caller's DebugId + DEBID "%s DATA", 1 +# calculate container size + mov C.csP,a + push C.txtP(a) + call strlen + addl $HeaderL, a + addl $PayloadL, a + inc a + mov thisP(bp), b # -->Data inst + mov a, Da.datalen(b) # save container length +# allocate container + push a + call malloc + test a, a + jnz 0f + SYSERR "malloc" +0: + mov thisP(bp), b # -->Data inst + mov a, Da.contP(b) # save -->container +# initialize container + mov Da.contP(b), c # -->Container + lea Co.hdr(c), a # -->Header + movl $0, H.ttl(a) + movl $0, H.ts(a) + push $0f + lea Co.payl(c), a # -->Payload + lea Pa.text(a), a + push a + call strcpy # init Payload.text + LOG 5, "Data instance established" + EPILOG +0: .asciz "EMPTY" +#----------------------------------------------- +# L O A D P A Y L O A D +#----------------------------------------------- +# returns -->data container + M_ARGS + DS ttl + DS loadP # -->text to be sent + M_LOCAL + DL t, timevalL # timestamp buf + M_PROLOG Da, load + LOG 5, "loading..." + mov Da.contP(b), c # -->Container +# fill header + lea Co.hdr(c), d # -->containner header + mov ttl(bp), a + mov a, H.ttl(d) # init TTL in header +# get timestamp + call D.getTs # get timestamp + mov a, H.ts(d) # timestamp to header +# init payload + push loadP(bp) # -->loaded text + lea Co.payl(c), a # -->Payload + lea Pa.text(a), a + push a + call strcpy # copy text to payload +# return -->Data + push Da.datalen(b) + LOG 5, "payload loaded to container, len=%u" + mov Da.contP(b), a # return -->container + EPILOG_R +#----------------------------------------------- +# G E T D A T A C O N T A I N E R L E N G T H +#----------------------------------------------- +# returns length of container filled with data + M_ARGS + M_LOCAL + M_PROLOG Da, getDataLen + mov Da.datalen(b), a + EPILOG_R +#----------------------------------------------- +# G E T D A T A C O N T A I N E R P O I N T E R +#----------------------------------------------- + M_ARGS + M_LOCAL + M_PROLOG Da, getContP + mov Da.contP(b), a + EPILOG_R +#----------------------------------------------- +# C H E C K P A Y L O A D +#----------------------------------------------- +# returns boolean: original text == received text + M_ARGS + M_LOCAL + M_PROLOG Da, chk +# get original payload + mov C.csP, c # -->CS + pushl C.txtP(c) # -->orig text +# get payload from container + mov Da.contP(b), a # -->Container + lea Co.payl(a), a # -->Payload + lea Pa.text(a), a # -->payload text + push a + call strcmp + xor c, c + test a, a # 0 = texts are equal + setz %cl + mov c, a + EPILOG_R +#----------------------------------------------- +# U N L O A D P A Y L O A D +#----------------------------------------------- +# returns -->payload text + M_ARGS + M_LOCAL + M_PROLOG Da, unl + mov thisP(bp), b # -->Data inst + lea Da.debug(b), a + mov a, deP(bp) +# get payload from container + mov Da.contP(b), a # -->Container + lea Co.payl(a), a # -->Payload + lea Pa.text(a), a # -->Payload.text + EPILOG_R +#----------------------------------------------- +# D E C R E M E N T T T L +#----------------------------------------------- +# decrement TTL in data container header, save it and return it + M_ARGS + M_LOCAL + M_PROLOG Da, dttl + mov Da.contP(b), a # -->Container + lea Co.hdr(a), a # -->container header + decl H.ttl(a) # TTL-- + mov H.ttl(a), a # return TTL + EPILOG_R +#----------------------------------------------- +# G E T T T L +#----------------------------------------------- + M_ARGS + M_LOCAL + M_PROLOG Da, ttl + mov Da.contP(b), a # -->Container + lea Co.hdr(a), a # -->container header + mov H.ttl(a), a # return TTL + EPILOG_R +#----------------------------------------------- +# P U T l I S T E N P O R T T O H E A D E R +#----------------------------------------------- + ARGS + DS thisP # -->Data inst + DS port +# returns port value + PROLOC + DL deP # -->Debug + EPILOC +#----------------------------------------------- + .global Da.putPort +Da.putPort: + PROLOG + mov thisP(bp), b # -->Data inst + lea Da.debug(b), a + mov a, deP(bp) # -->Debug + + mov Da.contP(b), c # -->Container + lea Co.hdr(a), c # -->container header + mov port(bp), a + mov a, H.lport(c) + EPILOG_R +#----------------------------------------------- +# G E T l I S T E N P O R T F R O M H E A D E R +#----------------------------------------------- + M_ARGS + M_LOCAL + M_PROLOG Da, getPort + mov Da.contP(b), c # -->Container + lea Co.hdr(c), c # -->container header + mov H.lport(c), a + EPILOG_R +#----------------------------------------------- +# G E T T I M E S T A M P F R O M H E A D E R +#----------------------------------------------- + M_ARGS + M_LOCAL + M_PROLOG Da, getTs + mov Da.contP(b), a # -->Container + lea Co.hdr(a), a # -->container header + mov H.ts(a), a # return timestamp + EPILOG_R +#----------------------------------------------- +# S A B O T A G E T E X T +#----------------------------------------------- + ARGS + DS thisP # -->Data inst + PROLOC + DL deP # -->Debug + EPILOC +#----------------------------------------------- + .global Da.sabotage +Da.sabotage: + PROLOG + lea Da.contP(b), a # -->Container + lea Co.payl(a), a # -->Payload + lea Pa.text(a), a # -->Payload.text + movl $'?', (a) + EPILOG +#----------------------------------------------- +# C R E A T E D I G E S T F R O M T E X T +#----------------------------------------------- +# returns -->24 chars payload text digest + M_ARGS + DS digestP # -->text digest buffer + M_LOCAL + DL textP + M_PROLOG Da, digest24 +# check length of payload text + mov Da.contP(b), a # -->Container + lea Co.payl(a), a # -->Payload + lea Pa.text(a), a # -->Payload.text + mov a, textP(bp) + push a + call strlen + cmp $24, a # payload text length < 24 ? + jl Da.digest24Direct # yes, direct copy +# create digest from longer text + push $8 + push textP(bp) + push digestP(bp) + call strncpy # copy beg. of text + push $0f + mov digestP(bp), a # -->digest + add $8, a + push a # -->digest+8 + call strcpy + jmp 1f +0: .asciz "-------" +1: push textP(bp) + call strlen + mov textP(bp), c + add a, c + lea -8(c), c # -->end of text - 8 + push c + mov digestP(bp), a + lea 15(a), a # -->digest+15 + push a + call strcpy + jmp Da.digest24Ex +# directly copy shorter text +Da.digest24Direct: + push textP(bp) + push digestP(bp) + call strcpy +Da.digest24Ex: + mov digestP(bp), a # -->digest + EPILOG_R +#----------------------------------------------- + .end diff -r 000000000000 -r 5c129dd80d4f CSa32/Debug.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Debug.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,273 @@ + .include "DS.S" +#----------------------------------------------- +# A B S T R A C T S T R U C T U R E S +#----------------------------------------------- +#----------------------------------------------- +# S T A T I C V A R I A B L E S +#----------------------------------------------- + .data +D.t0: .int 0 # initial timestamp in usecs +D.semP: .int 0 # -->semaphore for debug output + .globl D.prgNameP +D.prgNameP: + .int 0 # -->prog name +#----------------------------------------------- +# D E B U G I N I T +#----------------------------------------------- +# initialize static debug vars + ARGS + DS prgP # -->program name +# returns: nothing + PROLOC + EPILOC +#----------------------------------------------- + .text + .global D.init +D.init: + PROLOG + push prgP(bp) + call strlen + inc a + push a + call malloc + mov a, D.prgNameP # save -->prog name for debugging + push prgP(bp) + push a + call strcpy + movl $0, D.t0 + call D.getTs # get actual time - t0 = actual time + mov a, D.t0 # set t0 to actual time + + jmp 3f + push $0 + push $t + call gettimeofday # get initial timestamp t0 + cmp $0, a + jz 0f + ERR "gettimeofday" +0: lea t, b # compute (1000000*secs+usecs) + mov secs(b), c + mov $1000000, a + mul c # 1000000*secs + add usecs(b), a # 1000000*secs+usecs + movl a, D.t0 # save low 32 bits of timestamp in t0 +3: + push $0 # allocate semaphore for debug output + push $-1 + push $0x21 # PROT_READ | PROT_WRITE + push $0x03 # MAP_SHARED | MAP_ANONYMOUS + push $16 # semaphore size + push $0 + call mmap # get shared storage for semaphore + cmp $-1, a + jne 0f + ERR "mmap" +0: mov a, D.semP # save -->semaphore + + push $1 # initial semaphore value + push $1 # semaphore shared between processes + pushl D.semP + call sem_init # initialize semaphore + cmp $0, a + jz 0f + ERR "sem_init of debug semaphore" +0: + EPILOG +#----------------------------------------------- +# S E T D E B U G I D +#----------------------------------------------- +# sets debug ID according to format + ARGS + DS deP # -->Debug instance + DS argc # # of values to be put into deb ID + DS formP # -->format +# ... # values +# returns: nothing + PROLOC + EPILOC +#----------------------------------------------- + .global D.setId +D.setId: + PROLOG +# format debug ID + mov argc(bp), c # no. of values +0: mov formP(bp,c,4), a # push values and format + push a + dec c + jns 0b # iterate on values + pushl $D.idL # max deb ID len + mov deP(bp), b # -->Debug instance + lea D.id(b), a # -->DebId + push a + call snprintf # construct DebId + cmp $0, a + jnl 0f + ERR "DEBID snprintf" +0: + LOG 6, "debid '%s' set", 1 + EPILOG +#----------------------------------------------- +# P R I N T D E B U G M S G +#----------------------------------------------- +# outputs to stderr the synchronized debug msg with time diff from program start +# on appripriate debug level, flushes stderr + ARGS + ac = 40 + DS deP # -->Debug instance + DS level # msg debug level + DS argc # # of values to be used in msg + DS form # -->format +# ... # values +# returns: nothing + PROLOC + ac = 0 + EPILOC +#----------------------------------------------- + .global D.log +D.log: + PROLOG + + mov C.csP, c + mov level(bp), a # msg debug level + cmp C.debMaxLev(c), a # max debug level >= msg debug level ? + jg 3f # nothing to print, leave + cmp $7, C.debMaxLev(c) # on max debug level = 7 print level 7 only + jne 0f + cmp $7, a # msg debug level = 7 ? + jne 3f +# prepare output string of msg +0: mov argc(bp), c # no. of values +0: mov form(bp,c,4), a # copy values and format + push a + dec c + jns 0b + push $D.msgL # msg buffer len + mov deP(bp), b # -->Debug instance + lea D.msg(b), a # -->msg buffer + push a + call snprintf # construct msg in buffer + cmp $0, a + jnl 0f + ERR "LOG snprintf" +# synchronize with other tasks +0: pushl D.semP + call sem_wait # synchronize + cmp $0, a + jz 0f + ERR "LOG sem_wait" +# get timestamp +0: call D.getTs # time from beg of program in usecs +# print debug msg + mov deP(bp), b # -->Debug instance + lea D.msg(b), c # -->msg text + push c + lea D.id(b), c # -->instance debug ID + push c + push a # tdiff in usecs + push $1f + push stderr + call fprintf + jmp 2f +1: .ascii "%09u %s: %s\n\0" +# flush stderr +2: push stderr + call fflush # flush stderr +# free semaphore + pushl D.semP + call sem_post # free semaphore + cmp $0, a + jz 3f + ERR "LOG sem_post" +3: + EPILOG +#----------------------------------------------- +# G E T T I M E S T A M P I N U S E C S +#----------------------------------------------- +# returns timestamp in usecs from beg of program +# returns: int value + PROLOC + DL t, timevalL # timestamp buffer + EPILOC +#----------------------------------------------- + .global D.getTs +D.getTs: + PROLOG + push $0 + lea t(bp), b # -->timeval buffer + push b + call gettimeofday +# compute time delta from program start = t-t0 in usecs +# tdiff = (1000000*ts.secs+ts.usecs)-t0 + lea t(bp), b # actual timestamp + mov secs(b), c # t.secs + mov $1000000, a + mul c # 1000000*t.secs + add usecs(b), a # 1000000*t.secs+t.usecs + sub D.t0, a # 1000000*t.secs+t.usecs-t0 + EPILOG_R +#----------------------------------------------- +# S U B R O U T I N E S L I S T +#----------------------------------------------- +# print subroutines names and addresses +# localize address from env ADDR= in subroutine + PROLOC + DL wAddr # wanted addr + EPILOC +#----------------------------------------------- + .global D.subr +D.subr: + PROLOG + movl $0, wAddr(bp) + pushl $0f + call getenv # get env ADDR + lea 4(sp), sp + jmp 1f +0: .ascii "ADDR\0" +1: cmp $0, a + jz 0f # no addr wanted + pushl $16 + pushl $0 + push a + call strtol # convert str to hex + lea 12(sp), sp + mov a, wAddr(bp) # searched addr +0: + pushl $0 # end of list + SUBRLIST # print and push list of subroutines + cmp $0, wAddr(bp) # address wanted ? + je D.subrR + mov wAddr(bp), d + mov sp, b # -->beg of subr list + xor a, a + jmp 1f +0: lea 8(b), b +1: cmpl $0, (b) # end of list ? + je 2f # yes + cmp 4(b), d # -->subr > wanted addr ? + jb 0b # yes, next + cmp 4(b), a # -->subr > last found ? + jnb 0b # no, next + mov (b), c # get subr name + mov 4(b), a # get subr addr + jmp 0b # next +2: test a, a + jnz 1f + push d + push $0f + push stderr + call fprintf + jmp D.subrR +0: .ascii "addr %p not found in subroutines\n\0" +1: sub a, d + push d # offset in subr + push c # subroutine name + pushl wAddr(bp) # wanted addr + push $0f + push stderr + call fprintf + jmp D.subrR +0: .ascii "searched addr %p in subroutine %s at offset %x\n\0" +D.subrR: + EPILOG +#----------------------------------------------- + .end diff -r 000000000000 -r 5c129dd80d4f CSa32/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Makefile Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,17 @@ +O := CS.o Debug.o Node.o Data.o +H := DS.S + +ASFLAGS := -Wall --gstabs -am --32 +LDFLAGS := -m elf_i386 -lc -lm -lpthread -lrt -lssl -lcrypto + +.PHONY: all clean +all: CS + +CS: $(O) Makefile + ld $(LDFLAGS) $(O) -o $@ + +%.o: %.S $(H) Makefile + as $(ASFLAGS) -a=$*.lst -o $@ $< + +clean: + rm -f CS $(O) diff -r 000000000000 -r 5c129dd80d4f CSa32/Node.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Node.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1616 @@ + .include "DS.S" + .data + .text +#----------------------------------------------- +# N O D E O P E R A T I O N S +#----------------------------------------------- + ARGS + DS CnstlnP # -->Constellation inst + DS port # node's port# +# returns +# nothing +# local vars + PROLOC + DL this, NodeL # this Node instance + DL thisP # -->this Node + DL deP # -->Debug inst for LOG macro + DL rc + DL tosignal # switch + DL sigact, SigActionL + DL sigmask, 128 + DL sigret + EPILOC +#----------------------------------------------- + .global Node +Node: + PROLOG + lea this(bp), b # -->this Node + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save -->Debug locally for LOG macro +# save parm values + mov CnstlnP(bp), c # -->Constellation + mov c, N.cnstlnP(b) # save -->Constellation + mov port(bp), a + mov a, N.locPort(b) # save port# +# set Debug id + push a + cmpl $Cn.ring, Cn.topo(c) + je 0f + push $8f + jmp 1f +0: push $7f +1: cmp $0,Cn.ssl(c) + je 2f + push $6f + jmp 9f +2: push $5f + jmp 9f +5: .asciz "non" +6: .asciz "" +7: .asciz "RING" +8: .asciz "MASH" +9: DEBID "%sSSL %s node %d", 3 # N->ssl ? "" : "non", N.topo==mash ? "MASH" : "RING", locPort + call N.init # initialize node + call N.bind # bind node to local port + push N.kicker(b) # kicker indicator + LOG 5, "kicker=%d" + cmpl $0, N.kicker(b) # kicker ? + je 0f # no + call N.kickOff # start transfer +0: + call N.mainLoop # iterate on socket I/O select + LOG 5, "closing ssc" + push N.ssc(b) + SYS close + movl $0, rc(bp) # rc = OK +# wait on thread closing client side + cmpl $0, N.closing(b) # closing client side ? + je N.chkData # no + push $0 + push N.ptid(b) # thread id + call pthread_join + test a, a + jz 1f + push a # unique error handlig for pthread_join + call strerror + push a + LOG 0, "pthread_join: %s" + push deP(bp) + call C.abend +1: + mov thisP(bp), b # -->this Node + push N.pid(b) + LOG 5, "task %u: closing thread returned" +# synchronize with all nodes in all constellations and then check received data +N.chkData: + call N.sync # wait for others + cmpl $0, N.kicker(b) # kicker ? + je 0f # no + + push N.dataP(b) # -->Data + call Da.chk + test a, a # data check OK ? + jnz 0f # yes + movl $1, rc(bp) # rc = BAD + LOG 0, "data on INPUT TO and OUTPUT FROM constellation DIFFER" +0: + push N.pid(b) + push rc(bp) + LOG 2, "end of operations, rc=%d, process=%u" + call exit +#----------------------------------------------- +# C O N S T R U C T O R +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug +# DL CnstlnP # -->Constellation + DL len # various lengths + strL = 128 + DL str, strL # string buf + EPILOC +#----------------------------------------------- +N.init: # c: -->Cnstln + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "initializing..." +# initialize node's attributes + call getpid + mov a, N.pid(b) + mov N.cnstlnP(b), c # -->Constellation + push deP(bp) # -->caller Debug id + lea N.data(b), a + mov a, N.dataP(b) + push a # -->Data + call Data # initialize data item + mov Cn.topo(c), a + mov a, N.topo(b) # save topology + mov Cn.ssl(c), a + mov a, N.ssl(b) # SSL switch + mov Cn.first(c), a + mov a, N.first(b) # save port# of first node in constellation + movl $0, N.kicker(b) # clear kicker switch + cmp a, N.locPort(b) # kicker ? (locPort == first) + sete N.kicker(b) +1: add Cn.nodes(c), a + dec a + mov a, N.last(b) # save port# of last node (last = first + nodes -1) + mov Cn.nodes(c), a + mov a, N.nodes(b) # save num. of nodes in constellation + mov Cn.div(c), a + mov a, N.div(b) # save divisor for random choice (MAX_INT / nodes) + mov Cn.topo(c), a + mov a, N.topo(b) + mov Cn.forwP(c), a + mov a, N.forwP(b) # save -->shared "forward" indicator + movl $0, N.closing(b) # unset closing indicator +# allocate and initialize server side and client side sockets + mov $SocketInfoL, a + mull N.nodes(b) # SocketInfo array len + mov a, N.sockArrLen(b) # save array len + push a + call malloc + cmp $0, a + jnl 0f + SYSERR "malloc" +0: mov thisP(bp), b # -->this Node + mov a, N.srvSideP(b) # save -->array of server side sockets + pushl N.sockArrLen(b) + call malloc + cmp $0, a + jnl 0f + SYSERR "malloc" +0: mov thisP(bp), b # -->this Node + mov a, N.cliSideP(b) # save -->array of client side sockets +# initialize allocated socket infos + xor c, c # offset +0: mov N.srvSideP(b), a # -->server side SocketInfo array +# movl $So.labelV, So.label(a, c) # indicate SocketInfo block + movl $-1, So.sc(a, c) # indicate server side socket is not in use + mov N.cliSideP(b), a # -->client side SocketInfo array +# movl $So.labelV, So.label(a, c) # indicate SocketInfo block + movl $-1, So.sc(a, c) # indicate client side socket is not in use + lea SocketInfoL(c), c # incr by SocketInfo len + cmp c, N.sockArrLen(b) + ja 0b # iterate + + cmpl $0, N.ssl(b) # node's ssl switch + jz N.initR # no SSL +# prepare SSL context +# call SSL_load_error_strings +# call SSL_library_init + call OPENSSL_init_ssl + LOG 4, "setting SSL contex...", 0 + call TLS_method + push a + call SSL_CTX_new + mov thisP(bp), b # -->this Node + mov a, N.ctxP(b) # save -->new SSL context + test a, a + jnz 0f + SSLERR "new SSL CTX" +# set SSL mode +0: pushl $0 + pushl $4 # SSL_MODE_AUTO_RETRY + pushl $33 # SSL_CTRL_MODE + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_ctrl + movl $0, 8(sp) # no callback + movl $2, 4(sp) # mode = SSL_VERIFY_FAIL_IF_NO_PEER_CERT + call SSL_CTX_set_verify # set peer certificate verification parameters +# set X509 key file name + mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local TCP port# + mov C.csP, c + pushl C.cePathP(c) # -->name of SSL path + pushl $0f # -->format + pushl $strL # buf len + lea str(bp), a + push a # -->string buf + call snprintf + jmp 1f +0: .asciz "%skeys/%u.key" +1: + pushl $1 # SSL_FILETYPE_PEM + lea str(bp), a # filename string + push a + LOG 5, "SSL private key used: %s", 1 + mov thisP(bp), b # -->this Node inst + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_use_PrivateKey_file + cmp $1, a + je 0f + SSLERR "hh's key file" +# set X509 cert file name +0: mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local TCP port # + mov C.csP, c + pushl C.cePathP(c) # -->name of SSL path + pushl $0f + pushl $strL # buf len + lea str(bp), a # string buf + push a + call snprintf + jmp 1f +0: .asciz "%scerts/%u.pem" +1: mov thisP(bp), b # -->this Node + LOG 5, "SSL certificate used: %s", 1 + pushl $1 # SSL_FILETYPE_PEM + lea str(bp), a # filename string + push a + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_use_certificate_file + cmp $1, a + je 0f + SSLERR "hh's cert file" +# set path to CA +0: mov thisP(bp), b # -->this Node + mov C.csP, c + pushl C.caPathP(c) # -->name of SSL path + pushl $0f # -->CApath + LOG 5, "SSL: CA path: %s", 1 + pushl $0 # -->CAfile not used + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_load_verify_locations # set default locations for trusted CA certificates + cmp $1, a + je N.initR + SSLERR "hh's thrusted certs path" +0: .asciz "/home/local/etc/ssl/certs/" +N.initR: + LOG 5, "initalized" + EPILOG +#----------------------------------------------- +# K I C K O F F +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + DL sci # socket info rank + DL digest, 24 + EPILOC +N.kickOff: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# load payload data + movl C.csP, c # -->CS + push C.txtP(c) # -->initial payload text + push C.ttl(c) # initial TTL +# lea N.data(b), a # -->Data +# push a + push N.dataP(b) # -->Data + call Da.load # load data container + call N.nextNode + mov a, N.next(b) # save next node# + sub N.first(b), a # first - next = socketinfo rank + mov a, sci(bp) # save socket info rank + lea digest(bp), a # -->digest buf + push a + push N.dataP(b) # -->Data + call Da.digest24 + call Da.getDataLen + push N.next(b) # next node# + push a # data len + lea digest(bp), a # -->digest + push a + LOG 2, "kicker: ready to initial send %s, len=%u to node %u" + push N.next(b) + push sci(bp) + call N.conn # connect to next node + push sci(bp) + call N.put # send data to next node + EPILOG +#----------------------------------------------- +# M A I N L O O P : I T E R A T E O N S O C K E T I / O +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + EPILOC +N.mainLoop: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# prepare + LOG 5, "preparing for I/O select..." + call N.clearSocketMasks # clear socket masks for select and zero nfds + call N.maskSsc # mask ssc for select +# iterate while there are some sockets masked for select (nfds > 0) +0: cmp $0, N.nfds(b) # any I/O in progress ? + jz 0f # no, end operations of node + call N.selectSocketIo # select on server side sockets and forward data + call N.clearSocketMasks # clear socket masks for select and zero nfds + call N.maskSsc # mask ssc for select + call N.maskSrvSockets # mask all connected server side sockets for select + jmp 0b +0: EPILOG +#----------------------------------------------- +# S E L E C T S O C K E T I / O +#----------------------------------------------- +# select on masked sockets +# upon return from select +# accept connections to ssc +# forward data from posted sockets +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + EPILOC +N.selectSocketIo: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + push N.nfds(b) + lea N.rs(b), a + push (a) + push 4(a) + push N.ssc(b) + LOG 5, "select ssc=%u, mask=%08x %08x, nfds=%u" + lea N.t(b), a # -->timeval + movl $1, Ti.secs(a) + movl $0, Ti.usecs(a) + push a + lea N.es(b), a + push a # -->es (exceptional mask is not used yet) + pushl $0 + lea N.rs(b), a # -->rs + push a + push N.nfds(b) + SYS select + lea N.rs(b), a + push (a) + push 4(a) + LOG 5, "return from select, mask of posted=%08x %08x" +# check ssc for incomming connect request, accept it and forward data + lea N.rs(b), a # -->rs select mask + push a + pushl N.ssc(b) + call fd_isset + test a, a # ssc I/O ? + jz N.checkAndForw # no, check other sockets +# prepare for accept; find free srv socket info block for accept + mov N.srvSideP(b), a # -->srv socket info array + xor c, c # offet in socket info array + xor d, d # socket info rank +0: cmp $-1, So.sc(a, c) # socket allocated ? + je 0f # no, free socket found + inc d + lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on sockets + ERR "can't accept, all sockets in use" +# free socket found, accept connection on it and forward data +0: push d + LOG 4, "slot for accept=%d" + call N.acc + call N.forw +# check all server side sockets for pending I/O and call forward on them +N.checkAndForw: + xor c, c # offet into socket info array + lea N.rs(b), a # -->rs select mask + push a + lea -4(sp), sp # adjust stacker for iteration + xor c, c # zero offset + xor d, d # set counter +# iterate through server side sockets and forward from posted sockets +0: mov N.srvSideP(b), a # -->srv socket info array + cmp $-1, So.sc(a, c) # socket connected ? + je 1f # no, iterate + mov So.sc(a, c), a + mov a, (sp) # stack socket# +# LOG 0, "checking port %u" + call fd_isset # socket I/O ? + test a, a + jz 1f # no, iterate + mov N.srvSideP(b), a + mov So.sc(a, c), a + mov d, (sp) # stack socket rank +# LOG 0, "port posted, rank=%u" + call N.forw # forward data +1: inc d + lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on srv side sockets + EPILOG +#----------------------------------------------- +# M A S K C O N N E C T E D S O C K E T S F O R S E L E C T +#----------------------------------------------- +# mask all connected server side socketS for next select + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSrvSockets: + PROLOG + mov b, thisP(bp) # save -->this Node + + lea N.rs(b), a # -->rs select mask + push a + lea -4(sp), sp # adjust stacker for iteration + xor c, c # zero offet +0: mov N.srvSideP(b), a # -->srv socket info array + cmp $-1, So.sc(a, c) # socket connected ? + je 1f # no, iterate + mov So.sc(a, c), a + mov a, (sp) # stack socket# + call N.maskSocket # mask socket for select +1: lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on srv side sockets + EPILOG +#----------------------------------------------- +# M A S K S S C F O R S E L E C T +#----------------------------------------------- +# mask ssc for select until forward is disabled + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSsc: + PROLOG + mov b, thisP(bp) # save -->this Node + + mov N.forwP(b), a + cmp $1, (a) # forwarding enabled ? + jne 0f # no, return + push N.ssc(b) + call N.maskSocket +0: EPILOG +#----------------------------------------------- +# C L E A R S O C K E T M A S K S +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + EPILOC +N.clearSocketMasks: + PROLOG + mov b, thisP(bp) # save -->this Node + + movl $0, N.nfds(b) # zero nfds + lea N.rs(b), a # zero rs select mask + push a + call fd_zero + lea N.es(b), a # zero es select masks + push a + call fd_zero + EPILOG +#----------------------------------------------- +# M A S K S O C K E T F O R S E L E C T +#----------------------------------------------- + ARGS + DS sc # socket# + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSocket: + PROLOG + mov b, thisP(bp) # save -->this Node + + lea N.rs(b), a # -->rs select mask + push a + pushl sc(bp) + call fd_set + mov sc(bp), a # socket# + cmp a, N.nfds(b) # nfds > sc ? + jg 0f # yes + inc a + mov a, N.nfds(b) # nfds = sc + 1 +0: EPILOG +#----------------------------------------------- +# F D S E T O P E R A T I N O S +#----------------------------------------------- + PROLOC # local data frame def. is used by 4 following subrs + DL thisP # -->this Node + dL deP + EPILOC +fd_zero: +# zero FD mask + ARGS + DS fdsetP # -->FD mask + PROLOG + mov fdsetP(bp), a + xor c, c +0: movl $0, (a, c, 4) + inc c + cmp $32, c # size of FD mask is 32 int (1024 bits) + jl 0b + EPILOG +fd_set: +# mask socket in FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + or d, (c, a) # set sc mask + EPILOG +fd_clear: +# clear socket from FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + not d + and d, (c, a) # clear sc mask + EPILOG +fd_isset: +# test socket bit status in FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + and (c, a), d # sc selected ? + mov d, a +# mov $0, a +# jz 0f # not selected +# mov $1, a +0: EPILOG_R +fd_mask: # a: sc +# adjust offsets into FD mask + xor d, d + movl $32, c + divl c # a: mask integer rank, c: bit offset + shll $2, a # a: mask integer offset + mov d, c + mov $1, d + shl %cl, d # d: "1" bit in position according to sc + ret # a: offset of mask integer +#----------------------------------------------- +# F O R W A R D D A T A T O O T H E R N O D E +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL digest, 24 + EPILOC +#----------------------------------------------- +N.forw: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + push sci(bp) # socket info rank + call N.get + test a, a # data read ? + jz N.forwCloseSock # no, EOF, close this socket and start closing clients +# kicker checks TTL + cmp $1, N.kicker(b) # kicker ? + jne 1f # no, continue forwarding + + lea -4(sp), sp # adjust stacker + lea digest(bp), a + push a # -->digest buf + push N.dataP(b) # -->Data + call Da.ttl + mov a, 8(sp) # TTL from data header + call Da.digest24 + mov a, 4(sp) # -->text digest + call Da.getPort + mov a, (sp) # remote listen port from data header + LOG 5, "received from node %u: %s, ttl=%d" + push N.dataP(b) # -->Data + call Da.dttl # decrement TTL in data + test a, a # TTl > 0 ? + jz N.forwKickerStop # no, disable forwarding in constellation +# forward data +1: call N.nextNode # calculate next node + mov a, N.next(b) + push a + LOG 4, "next node %u" + sub N.first(b), a # socket info rank (next - first) + mov a, sci(bp) # save +# connect if not connected yet + movl $SocketInfoL, d + mul d # a: offset into socket info array + mov N.cliSideP(b), c # -->client side socket info array + cmp $-1, So.sc(c, a) # connected ? + jne 0f # yes + push N.next(b) + push sci(bp) + call N.conn +0: + lea -4(sp), sp + push N.dataP(b) # -->Data + call Da.ttl + mov a, 4(sp) # ttl + call Da.getDataLen + mov a, (sp) # container len + push N.next(b) # next node# + LOG 5, "forwarding to %d, len=%d, ttl=%d --->" +# pacing + mov C.csP, c # -->CS + cmp $0, C.pacing(c) # pacing active ? + je 0f # no + pushl $0 + lea C.pace.tv_sec(c), a + push a + SYS nanosleep # pace +0: push sci(bp) + call N.put + LOG 4, "leaving forward" + jmp N.forwR # exit forwarding +N.forwKickerStop: # ttl = 0 + lea digest(bp), a + push a # -->digest buf + push N.dataP(b) # -->Data + call Da.digest24 + push a + LOG 1, "received after finally passing all: %s", 1 + mov N.forwP(b), a # -->shared forw indicator + movl $0, (a) # disable forwarding for all nodes in constellation + jmp N.forwCloseCli +N.forwCloseSock: + pushl deP(bp) + pushl $1 # indicate "server side" + push sci(bp) # socket info rank + call N.closeSocket # close this server side socket +N.forwCloseCli: + call N.closeClients # launch client side closing thread + LOG 4, "leaving forward, closing" +N.forwR: + EPILOG +#----------------------------------------------- +# D E T E R M I N E N E X T N O D E +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + EPILOC +N.nextNode: + PROLOG + mov b, thisP(bp) # save -->this Node + + cmp $Cn.ring, N.topo(b) # ring ? + jne 0f # no +# ring + mov N.locPort(b), a + inc a # next = locPort + 1 + cmp a, N.last(b) # next > last ? + jnl N.nextNodeR + mov N.first(b), a + jmp N.nextNodeR +# mash +0: call random + xor d, d + mov thisP(bp), b + divl N.div(b) # 0 <= random < nodes + add N.first(b), a # first + random + cmp a, N.locPort(b) + je 0b # iterate until other then local node is selected +N.nextNodeR: + push a + EPILOG_R + +#----------------------------------------------- +# B I N D T O L O C A L T C P P O R T +#----------------------------------------------- +# returns +# nothing + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + DL i + DL aiP # -->IP addrinfo + strL = 64 + DL str, strL # string buf + EPILOC +#----------------------------------------------- + .global N.bind +N.bind: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "binding...", 0 +# prepare IP addr + pushl $AddrInfoL + call malloc # alloc addrinfo for hints + mov a, aiP(bp) + pushl $AddrInfoL + pushl $0 + push a + call memset # clear hints + + movl $2, ai_family(a) # AF_INET + movl $1, ai_flags(a) # AI_PASSIVE for bind + movl $0, ai_protocol(a) # any protocol + movl $1, ai_socktype(a) # SOCK_STREAM, blocking type + + mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local port # + push $0f + lea str(bp), a + push a # -->str buff + call sprintf + jmp 1f +0: .ascii "%d\0" +1: + lea aiP(bp), a + push a # -->-->addrinfo for new addrinfo + pushl aiP(bp) # -->addrinfo hints + lea str(bp), a + push a # -->loc port # string + pushl $0f + call getaddrinfo + jmp 1f +0: .ascii "localhost\0" +# check IP addr +1: cmp $0, a # getaddrinfo OK ? + jz 0f # yes + push a + call gai_strerror + push a + LOG 0, "getaddrinfo error: %s, ABEND", 1 + push $1 + call exit +# log assigned IP addr +0: pushl deP(bp) + mov aiP(bp), a # -->addrinfo + pushl ai_addrP(a) # -->sockaddr + lea str(bp), a + push a # -->string buf + call gai + + lea str(bp), a + push a + LOG 5, "getaddrinfo OK, %s", 1 +# allocate socket + mov aiP(bp), a + pushl ai_protocol(a) + pushl ai_socktype(a) + pushl ai_family(a) + SYS socket + mov a, N.ssc(b) # save ssc +# set socket option SO_REUSEADDR + pushl $4 # integer width + lea 9f, a # value 1 + push a + pushl $2 # SO_REUSEADDR + pushl $1 # SOL_SOCKET + push N.ssc(b) # ssc + SYS setsockopt + jmp 8f +9: .long 1 +8: +# bind + mov aiP(bp), a + pushl ai_addrlen(a) + pushl ai_addrP(a) + push N.ssc(b) # ssc + SYS bind +# listen + pushl $1 # pend conns queue len + pushl N.ssc(b) # ssc + SYS listen + pushl N.locPort(b) + LOG 2, "bound to %d", 1 +N.bindR: + EPILOG +#----------------------------------------------- +# C O N N E C T T O R E M O T E N O D E +#----------------------------------------------- +# args + ac = 40 + DS i # socket rank + DS remPort # TCP port# on remote site +# returns +# nothing +# local vars + ac = 0 + DL debug, DebugL # Debug inst + DL deP # -->Debug inst + DL retry # retry counter + DL ai, AddrInfoL # IP addrinfo + DL aiP # -->IP addrinfo + DL str, 64 # string buf + DL currSockP # -->SocketInfo save area + locL = ac +#----------------------------------------------- + .global N.conn +N.conn: + PROLOG + mov b, thisP(bp) # save -->this Node + lea debug(bp), a # -->own local debug block + mov a, deP(bp) # save -->local Debug + pushl remPort(bp) # remote port + lea N.debug(b), d # -->node's Debug + lea D.id(d), a # -->node's debug ID + push a + DEBID "%s to %u", 2 # set local debug ID + LOG 3, "connecting...", 0 + + mov N.cliSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull i(bp) # SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + mov remPort(bp), a + mov a, So.remPort(c) # put remote port # into curr SocketInfo +# prepare IP addrinfo hints + pushl $AddrInfoL # IP addrinfo length + pushl $0 + lea ai(bp), a + mov a, aiP(bp) # -->IP addrinfo + push a + call memset # clear addrinfo + movl $2, ai_family(a) # AF_INET + movl $0, ai_flags(a) + movl $0, ai_protocol(a) # any protocol + movl $1, ai_socktype(a) # SOCK_STREAM +# set up port # string + mov currSockP(bp), a # -->curr socket info + pushl So.remPort(a) # remote port# + pushl $0f + lea str(bp), a + push a # -->port# string + call sprintf + jmp 1f +0: .ascii "%u\0" +# get IP addrinfo block chain +1: lea aiP(bp), a # -->-->IP addrinfo + push a + pushl aiP(bp) # -->IP addrinfo hints + lea str(bp), a # -->remote port # string + push a + pushl $0f + call getaddrinfo + jmp 1f +0: .ascii "localhost\0" +# check IP addr +1: cmp $0, a # getaddrinfo OK ? + jz 0f # yes + push a + call gai_strerror + push a + LOG 0, "getaddrinfo error: %s, ABEND", 1 + push $1 + call exit # ABEND +# log assigned IP addr +0: pushl deP(bp) # -->Debug inst + mov aiP(bp), a # -->IP addrinfo + pushl ai_addrP(a) # -->sockaddr + lea str(bp), a + push a # -->string buf + call gai + LOG 5, "getaddrinfo OK, %s", 1 +# allocate comm socket + mov aiP(bp), a # -->IP addrinfo + pushl ai_protocol(a) + pushl ai_socktype(a) + pushl ai_family(a) +# call socket # allocate comm socket +# cmp $0, a +# jg 0f +# SYSERR "socket alloc" +#0: + SYS socket + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sc(c) # put new socket # into SocketInfo +# connect to remote partner server side + mov C.csP, a + mov C.connTh(a), a # conn retry threshold + mov a, retry(bp) # initilize retry counter + mov aiP(bp), a # -->IP addrinfo + pushl ai_addrlen(a) + pushl ai_addrP(a) + pushl So.sc(c) # socket # +2: call connect + cmp $0, a + jnl 0f # connected + call __errno_location # get --> sys errno + cmp $111, (a) # ECONNREFUSED ? + je 1f # yes, wait to retry + SYSERR "connect" # no, ABEND +1: pushl $22000 # 22 msecs + call usleep + lea 4(sp), sp + sub $1, retry(bp) + jnz 2b # iterate +# conn retry threshold reached + mov C.csP, a + pushl C.connTh(a) + LOG 0, "connection refused, threshold %d reached", 1 + pushl $1 + call exit # ABEND +0: +# connected, account new connection + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + lea S.counter_sem(a), a # -->overall counter semaphore + push a + SYS sem_wait + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + incl S.conns(a) # incr conns counter + lea S.counter_sem(a), a # -->overall msg counter semaphore + push a + SYS sem_post +# prepare SSL + cmp $1, N.ssl(b) # node's ssl switch + jne N.connRet # no SSL + + LOG 5, "prepare for SSL" + pushl N.ctxP(b) # -->SSL context + call SSL_new # create a SSL structure + + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sslP(c) # put -->SSL struct into SocketInfo + test a, a + jnz 0f + SSLERR "new SSL" +# connect on SSL +0: call ERR_clear_error + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket + pushl So.sslP(c) # -->SSL struct + call SSL_set_fd # connect the SSL object with a file descriptor + test a, a + jnz 0f + SSLERR "client SSL set fd" +0: + LOG 5, "SSL fd set" + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL struct + call SSL_connect + test a, a # connected ? + jns 1f # yes + push a + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL struct + call SSL_get_error # SSL_get_error(-->ssl, err) + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 0f # no, other SSL err + SYSERR "SSL connect" +0: SSLERR "SSL connect" +1: + mov C.csP, a + mov C.connTh(a), a + sub retry(bp), a # # of retries = conn threshold - counter + push a + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket# + LOG 2, "connected via sc=%d after %d retries", 2 +N.connRet: + EPILOG +#----------------------------------------------- +# A C C E P T C O N N E C T I O N +#----------------------------------------------- + ARGS + DS i # socket rank +# returns +# nothing +# local vars + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL sa, SockAddrL # sockaddr + DL str, 64 # string buf + DL currSockP # -->curr SocketInfo save area + EPILOC +#----------------------------------------------- + .global N.acc +N.acc: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "accepting..." + + mov N.srvSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull i(bp) # curr SocketInfo offset + lea (c, a), a # -->curr SocketInfo + mov a, currSockP(bp) # save -->curr SocketInfo +# accept + pushl $0 + pushl $0 +# mov thisP, b # -->Node inst + pushl N.ssc(b) # listen socket +# call accept +# cmp $0, a +# jnl 0f +# SYSERR "accept" +#0: + SYS accept + mov currSockP(bp), c # -->Node inst + mov a, So.sc(c) # save comm socket of accepted connection + pushl $0f # -->sa length + lea sa(bp), a # -->sockaddr + push a + pushl So.sc(c) # comm socket + call getpeername + jmp 1f +0: .int SockAddrL +1: pushl deP(bp) + lea sa(bp), a # -->sockaddr + push a + lea str(bp), a # -->string buf + push a + call gai # get string of addr/port + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # comm socket + LOG 3, "peer: sc=%d, %s", 2 +# prepare SSL + mov thisP(bp), b # -->this Node + cmp $1, N.ssl(b) # node's ssl switch + jne 1f # no SSL + + pushl N.ctxP(b) # -->SSL context + call SSL_new # create a SSL structure + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sslP(c) # put -->SSL struct into SocketInfo + test a, a + jnz 0f + SSLERR "new SSL" +# accept on SSL +0: call ERR_clear_error + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket # + pushl So.sslP(c) # -->SSL struct + call SSL_set_fd # connect the SSL object with a file descriptor + test a, a + jnz 0f + SSLERR "client SSL set fd" +0: + LOG 5, "SSL fd set" + call SSL_accept + test a, a + jns 1f + mov a, 4(sp) + call SSL_get_error # SSL_get_error(-->ssl, err) + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 0f # no, other SSL err + SYSERR "SSL accept" +0: SSLERR "SSL accept" +1: + LOG 2, "accepted" + EPILOG +#----------------------------------------------- +# R E A D F R O M C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns length read + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL shP # -->shared mem + DL currSockP # -->curr SocketInfo + DL n # len read + DL rest # len to be read + DL buf # read dest + EPILOC +#----------------------------------------------- + .global N.read +N.read: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + mov C.csP, a # -->CS + mov C.shP(a), a # -->shared mem + mov a, shP(bp) # save -->shared mem + push N.dataP(b) + call Da.getDataLen + mov a, rest(bp) # init len to be read + call Da.getContP + mov a, buf(bp) # init read dest + mov N.srvSideP(b), c # -->server side SocketInfo array + mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + + push So.sc(c) # comm socket + push rest(bp) # len to be read + LOG 5, "ready to read len=%d from sc=%d..." +# iterate on reading until whole length is read +N.readIterate: + cmp $0, rest(bp) # something to be read ? + jng N.readAcc # no, end reading + push rest(bp) # len to be read + push buf(bp) # read dest + mov currSockP(bp), c # curr SocketInfo + cmp $1, N.ssl(b) # SSL ? + jne 0f # no +# read w/ SSL + push So.sslP(c) # -->SSL struct + SYS SSL_read + jmp 1f +# read w/o SSL +0: + push So.sc(c) # socket # + SYS read +# adjust read controls +1: + jz N.readEof # EOF read + add a, buf(bp) # adjust -->read dest + sub a, rest(bp) # decrement rest to read + cmp $0, rest(bp) # everything read ? + jng 0f + push a + LOG 5, "partly read %d bytes" +0: jmp N.readIterate +# account data just read +N.readAcc: + mov shP(bp), a # -->shared mem + lea S.counter_sem(a), a # overall msg counter semaphore + push a + SYS sem_wait # get semaphore + mov shP(bp), a + incl S.msgs(a) # incr msgs counter + lea S.counter_sem(a), a # overall msg counter semaphore + push a + SYS sem_post # post semaphore + + lea -4(sp), sp # adjust stacker + push N.dataP(b) # -->Data + call Da.getPort + mov a, 4(sp) # remote listen port + call Da.getDataLen + mov a, (sp) # data length + LOG 5, "%d read from node %u" + mov (sp), a # return len read + jmp N.readRet +# EOF read, finish +N.readEof: + LOG 4, "EOF read" +N.readRet: + EPILOG_R +#----------------------------------------------- +# W R I T E T O C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns length written + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL currSockP # -->curr SocketInfo + DL n # len written + DL rest # len to be written + DL buf # write orig + EPILOC +#----------------------------------------------- + .global N.write +N.write: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# init control values + mov C.csP, a # -->CS + mov C.shP(a), a # -->shared mem + mov a, shP(bp) # save -->shared mem + push N.dataP(b) # -->Data + call Da.getDataLen + mov a, rest(bp) # init len to be written + call Da.getContP + mov a, buf(bp) # init write orig + mov N.cliSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + + push So.sc(c) # comm socket + push rest(bp) # len to be written + LOG 5, "ready to write len=%d to sc=%d..." +# iterate on writing until whole length is written +N.writeIterate: + cmp $0, rest(bp) # something to be written ? + jng N.writeEnd # no, end writing + push rest(bp) # len to be written + push buf(bp) # write orig + cmp $1, N.ssl(b) # SSL ? + jne 0f # no +# SSL + pushl So.sslP(c) # -->SSL struct + SYS SSL_write + jmp 1f +# no SSL +0: + pushl So.sc(c) # socket + SYS write +# adjust write controls +1: + mov a, n(bp) # save len written + add a, buf(bp) # adjust write orig + sub a, rest(bp) # decrement rest to write + cmp $0, rest(bp) # everything written ? + jng 0f + push a + LOG 5, "partly written %d bytes" +0: jmp N.writeIterate +# return datalen +N.writeEnd: + mov N.dataP(b), a + mov Da.datalen(a), a + push a + LOG 5, "%d written", 1 + EPILOG_R +#----------------------------------------------- +# G E T D A T A F R O M C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns boolean: 0 - EOF read, 1 - data read + PROLOC + DL thisP # -->this Node + DL deP + EPILOC +#----------------------------------------------- + .global N.get +N.get: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + push sci(bp) # SocketInfo rank + LOG 5, "get data from socket, rank=%u" + call N.read + test a, a + jz 0f + mov $1, a +0: EPILOG_R +#----------------------------------------------- +# P U T D A T A T O C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns boolean: 0 - nothing written, 1 - data written + PROLOC + DL thisP # -->this Node + DL deP + EPILOC +#----------------------------------------------- + .global N.put +N.put: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 5, "put data to socket" + + mov N.dataP(b), d # -->Data + mov Da.contP(d), a # -->container + lea Co.hdr(a), a # -->header + mov N.locPort(b), c + mov c, H.lport(a) # listen port to header + + pushl sci(bp) # SocketInfo rank + call N.write + test a, a + jz 0f + mov $1, a +0: EPILOG_R +#----------------------------------------------- +# C L O S E C O N N E C T I O N S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank + DS server # indicate server or client + DS deP # -->Debug inst +# returns:# nothing + PROLOC + DL thisP # -->this Node + DL currSockP # -->curr SocketInfo + DL tagP + DL e + EPILOC +N.srvSideTag: .asciz "server side" +N.cliSideTag: .asciz "client side" +#----------------------------------------------- + .global N.closeSocket +N.closeSocket: + PROLOG + mov b, thisP(bp) # save -->this Node + mov N.srvSideP(b), c # -->SocketInfo array + movl $N.srvSideTag, tagP(bp) + cmp $1, server(bp) # server side ? + je 0f + mov N.cliSideP(b), c # client side + movl $N.cliSideTag, tagP(bp) +0: mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c + mov c, currSockP(bp) # save -->curr SocketInfo + push So.sc(c) # socket to close + push tagP(bp) + LOG 5, "closing %s sc=%u...", 2 + + cmp $0, N.ssl(b) # SSL ? + je 2f # no +# SSL shutdown + pushl So.sslP(c) # -->SSL structure + call SSL_shutdown + cmp $0, a + jns 0f # no err + SSLERR "SSL shutdown (1)" +0: jnz 2f # SSL shutdown finished + LOG 5, "SSL shutdown rc=0, retry" + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL structure + call SSL_shutdown + cmp $0, a + jg 2f # SSL shutdown finished + push a # SSL err + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL structure + call SSL_get_error + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 1f # no, other SSL err +# syscall err + call ERR_get_error + test a, a # SSL err ? + jnz 2f # yes, don't care + call __errno_location # sys err ? + test a, a + jz 2f # no + SYSERR "SSL shutdown (2)" +# other SSL err, print SSL err queue +1: SSLERR "SSL shutdown (2)" +# close socket +2: mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket to close +# call close +# test a, a +# jz 0f +# SYSERR "close" +#0: + SYS close + push tagP(bp) + LOG 4, "%s sc=%u closed", 2 + mov currSockP(bp), c # -->curr SocketInfo + movl $-1, So.sc(c) # indicate closed socket + EPILOG +#----------------------------------------------- +# C R E A T E T H R E A D T O C L O S E C L I E N T S I D E S +#----------------------------------------------- + ARGS + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL pt # pthread_t (int) + EPILOC +#----------------------------------------------- +N.closeClients: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + cmp $1, N.closing(b) + je 0f # already closing, nothing to do + LOG 5, "spawning thread to close client side" + push b # -->this (param for thread) + pushl $N.closeCliThread # -->thread func + pushl $0 + lea N.ptid(b), a # -->thread ID + push a + call pthread_create + test a, a + jz 1f + SYSERR "create thread" +1: mov thisP(bp), b # -->this Node + movl $1, N.closing(b) # indicate "closing" + push N.ptid(b) + LOG 5, "closing thread (%u) spawned" +0: EPILOG +#----------------------------------------------- +# C L O S E C L I E N T S I D E S +#----------------------------------------------- + ARGS + DS thisP + PROLOC + DL deP # -->Debug inst + DL debug, DebugL + EPILOC +#----------------------------------------------- +N.closeCliThread: + PROLOG + mov thisP(bp), b # -->this Node + lea debug(bp), a + mov a, deP(bp) # save deP + lea N.debug(b), a + lea D.id(a), a # -->caller debug id + push a + DEBID "%s CLOSE clients", 1 + LOG 5, "start..." + mov $0, c # offet in socket info array + mov $0, d # socket info rank + mov N.cliSideP(b), a # -->client socket info array +0: # iterate on opened sockets + cmp $-1, So.sc(a, c) # sc allocated ? + je 1f # no, iterate + push So.sc(a, c) # socket# + LOG 4, "sc=%u" + push deP(bp) + pushl $0 # indicate "client" + push d # socket info rank + call N.closeSocket # close socket +1: inc d + cmp N.nodes(b), d # rank < nodes ? + je 0f # no, end + lea SocketInfoL(c), c # incr offset + jmp 0b # iterate +0: LOG 4, "finished, exiting thread" + pushl $0 + call pthread_exit + EPILOG +#----------------------------------------------- +# S Y N C H R O N I Z E A L L N O D E S +#----------------------------------------------- + ARGS + PROLOC + DL thisP + DL deP + DL shP + DL tosignal + DL sigmask, 128 + DL sigact, SigActionL + EPILOC +N.sync: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save -->Debug locally for LOG macro + + movl $0, tosignal(bp) + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + push S.act(a) + mov a, shP(bp) + push N.pid(b) + LOG 5, "%u synchronizing: active=%u" +# set signal handler block + lea sigact(bp), a + movl $N.sighandle, sa_handler(a) + movl $0, sa_flags(a) + lea sa_mask(a), a # set handler mask + push a + SYS sigfillset + jmp 0f +N.sighandle: + ret +0: +# activate signal handler + lea sigmask(bp), a + push a + SYS sigemptyset + push $SIGUSR2 + lea sigmask(bp), a + push a + SYS sigaddset + push $0 + lea sigmask(bp), a + push a + push $SIG_UNBLOCK + SYS sigprocmask + push $0 + lea sigact(bp), a + push a + push $SIGUSR2 + SYS sigaction +# update nodes counter + mov shP(bp), a + lea S.counter_sem(a), a + push a + SYS sem_wait # get semaphore + mov shP(bp), a + decl S.act(a) # decrement nodes counter + setz tosignal(bp) + mov shP(bp), a + lea S.counter_sem(a), a + push a + SYS sem_post # post semaphore + testl $1, tosignal(bp) # all finished? + jz N.sigwait +# signal all tasks + push N.pid(b) + LOG 5, "%u signaling USR2" + push $SIGUSR2 + push $0 + SYS kill + jmp N.woken +# wait for signal +N.sigwait: +# call pause + push $5 + call sleep +N.woken: + mov thisP(bp), b # -->this Node + push N.pid(b) + LOG 5, "%u woken up" + EPILOG +#----------------------------------------------- +# F O R M A T A N D P R I N T S S L E R R O R +#----------------------------------------------- +# args + ac = 8 + DS deP # -->Debug + DS msgP # -->accompanying msg str +# returns: nothing + PROLOC + DL str, 256 # generated string buf + EPILOC +#----------------------------------------------- + .global N.sslErr +N.sslErr: + enter $locL, $0 +0: + call ERR_get_error + push a + LOG 5, "ssl err, e=%lu", 1 + test a, a # end of msg queue ? + jz 0f # yes + lea str(bp), d + push d # -->err msg str buf + push a # err + call ERR_error_string + push a # -->err msg str + pushl msgP(bp) # -->accompanying msg str + LOG 0, "%s: %s", 2 + jmp 0b # iterate on SSL err msg queue +0: + pushl $1 + call exit # ABEND +#----------------------------------------------- +# G E T A S S I G N E D I P A D D R +#----------------------------------------------- +# puts assigned IP address:port in string + ARGS + DS sP # -->output string + DS saP # -->sockaddr + DS deP # -->Debug +# returns: nothing + PROLOC + EPILOC +#----------------------------------------------- +gai: PROLOG + + mov saP(bp), b # -->sockaddr + xor a, a + mov sa_data(b), %ax # port # in big endian order + push a + call ntohs + mov a, d + + mov saP(bp), b # -->sockaddr + pushl sa_data+2(b) # IP addr in big endian order + call ntohl + + push d # port # as int + + mov $4, c # iterate on 4 bytes of IP addr +0: mov a, d + and $0xff, d + push d # part of IP addr + shr $8, a + dec c + jnz 0b # iterate + + push $0f + pushl sP(bp) # -->output buf + call sprintf + jmp 1f +0: .ascii "%u.%u.%u.%u:%d\0" +1: + EPILOG +#----------------------------------------------- + .end diff -r 000000000000 -r 5c129dd80d4f CSc/CS.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/CS.c Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,158 @@ +#include "CS.h" + +CSP csP; +DebugP deP; + +void abend(DebugP deP) { +// LOG(0, "ABORT, netstat:"); +// int pid; +// if(!(pid = fork())) { +// execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL); +// exit(EXIT_SUCCESS); +// } +// waitpid(pid, NULL, 0); + LOG(0, "ABORT, backtrace:"); + back_trace(); + kill(0, SIGTERM); + exit(EXIT_FAILURE); +} +char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket + char *s = malloc(64); + unsigned short port = *(unsigned short*) ai_addr->sa_data; + char *a = ai_addr->sa_data + 2; + sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port)); + return s; +} +void gai(int level, struct addrinfo *ai, DebugP pr) { // logs assigned IP4 addresses from addrinfo chain + struct addrinfo *sa = ai; + if (csP->debMaxLev >= level) do { + char *s = gpa(sa->ai_addr); + strcpy(pr->msg, s); free(s); + deb(level, pr); + } while ((sa = sa->ai_next)); + fflush(stderr); +} +void ssl_err(DebugP pr, char *s) { + long e; + e = ERR_get_error(); + LOG(4, "ssl err, e=%lu", e); + while(e) { + LOG(0, "%s: %s", s, ERR_error_string(e, NULL)); + e = ERR_get_error(); + LOG(4, "ssl err, e=%lu", e); + } + abend(pr); +} +static int getArg(char *a) { + return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; +} +static void constellation(topology topo, int ssl) { + DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING"); + int first, nodes, *forw, pid, stat, exitRc = EXIT_SUCCESS; + pid_t *pids; + if(topo==ring) { first=csP->rp0; nodes=csP->rn; } + if(topo==mash) { first=csP->mp0; nodes=csP->mn; } + if(nodes == 0) exit(0); + if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); } + LOG(1, "%d node(s) starting...", nodes); + pids = malloc(nodes*sizeof(int)); + first += ssl*500; + if((forw = mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap"); + *forw = 1; + for(int i = 0; i < nodes; i++) { + if(!(pid = fork())) Node(topo, forw, first+i, first, nodes, ssl); + else { LOG(3, "node %u established in process %u", first+i, pid); pids[i] = (pid_t)pid; } + } + LOG(2, "all nodes established"); + while((pid = wait(&stat)) > 0) { + if(WIFEXITED(stat)) { + LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat)); + if(WEXITSTATUS(stat) != EXIT_SUCCESS) exitRc = EXIT_FAILURE; + } + else { LOG(4, "node process %u crashed", pid); exitRc = EXIT_FAILURE; } + } + LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "BADLY"); + exit(exitRc); +} +int main(int argc, char *argv[]) { + CST cs; + csP = &cs; + deP = &(csP->debug); + ShareA(csP->shP); + struct sigaction ign; + + if(sem_init(&(csP->shP->debugSem), 1, 1) < 0) ERR("LOG sem_init"); + debug_init(argv[0]); + DEBID("client/server demo"); + if(getArg("DEB") >= 0) csP->debMaxLev = getArg("DEB"); + + if(sem_init(&csP->shP->counterSem, 1, 1) < 0) SYSERR("sem_init"); + ign.sa_flags = 0; + ign.sa_handler = SIG_IGN; + if(sigaction(SIGUSR2, &ign, NULL)<0) SYSERR("sigaction"); + + csP->shP->msgs = 0; + csP->shP->conns = 0; + csP->text="bla bla"; + csP->ttl = 3; + csP->rp0 = 11000; + csP->mp0 = 12000; + csP->rn = 0; + csP->mn = 0; + csP->pace.tv_sec = csP->pace.tv_nsec = 0; + csP->connThreshold = 77; // connection retries threshold + csP->connTO = 0.01 * 1000*1000; // connection timeout in usecs + csP->selTO = 1; // selection timeout in secs + csP->issl = 0; + csP->caP = (char*)"/home/local/etc/ssl/certs/"; + char *sslPathSuffP = "/../CS/"; + + if(getenv("T") != NULL) csP->text = getenv("T"); + if(getenv("CEP") != NULL) { + csP->ceP = (char*)malloc(strlen(getenv("CEP"))); + strcpy(csP->ceP, getenv("CEP")); } + else {csP->ceP = (char*)malloc(strlen(dirname(argv[0])) + strlen(sslPathSuffP) + 1); + strcpy(csP->ceP, dirname(argv[0])); + strcpy(csP->ceP + strlen(dirname(argv[0])), sslPathSuffP); } + if(getenv("CAP") != NULL) { + csP->caP = (char*)malloc(strlen(getenv("CAP")) + 1); + strcpy(csP->caP, getenv("CAP")); } + if(getArg("TTL") > 0) csP->ttl = getArg("TTL"); + if(getArg("RP0") > 0) csP->rp0 = getArg("RP0"); + if(getArg("MP0") > 0) csP->mp0 = getArg("MP0"); + if(getArg("N") >= 0) { + csP->mn = getArg("N"); + csP->rn = csP->mn; } + if(getArg("SSL") >= 0) csP->issl = getArg("SSL"); + if(getArg("RN") >= 0) csP->rn = getArg("RN"); + if(getArg("MN") >= 0) csP->mn = getArg("MN"); + csP->shP->act = csP->rn + csP->mn; // initialize active node processes counter + if(csP->issl > 1) csP->shP->act += csP->shP->act; + csP->pacing = 0; + if(getenv("P") != NULL) { + double d = atof(getenv("P")); + csP->pace.tv_sec=(time_t)trunc(d); + csP->pace.tv_nsec=(d-csP->pace.tv_sec)*1000*1000*1000; + if(csP->pace.tv_sec > 0 || csP->pace.tv_nsec > 0) csP->pacing = 1; + } + if(getArg("RS") >= 0) srandom(getArg("RS")); +// LOG(5, "initialized: ssl=%u, ssl path: %s, CA path: %s, pace=%f", +// csP->issl, csP->ceP, csP->caP, (double)csP->pace.tv_sec+(double)csP->pace.tv_nsec/(1000*1000*1000)); + LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\ + argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, csP->debMaxLev); + if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->ceP, csP->caP); + + if(csP->issl < 2) { // 0 = no ssl, 1 = with SSL, 2 = both + if(!fork()) constellation(ring, csP->issl); + if(!fork()) constellation(mash, csP->issl); + } else for(int i = 0; i < 2; i++) { + if(!fork()) constellation(ring, i); + if(!fork()) constellation(mash, i); + } + + int stat, pid; + while((pid = wait(&stat)) > 0) if(WIFEXITED(stat)) LOG(4, "constellation process %u ended with exit(%d)", pid, WEXITSTATUS(stat)); + + LOG(1, "final balance: forwards=%d, connections=%d", csP->shP->msgs, csP->shP->conns); + return 0; +} diff -r 000000000000 -r 5c129dd80d4f CSc/CS.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/CS.h Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,155 @@ +#ifndef _CSH_ +#define _CSH_ 1 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct DebugS { + char id[128]; + char msg[256]; +} DebugT, *DebugP; +#define DebugA() (DebugP)malloc(sizeof(DebugT)) + +typedef struct ShareS { + int conns; + int msgs; + int act; + int mashOpenClientCount; + int mashOpenSSLClientCount; + sem_t counterSem; + sem_t debugSem; +} ShareT, *ShareP; +#define ShareA(v) if((v = mmap(NULL, sizeof(ShareT), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap") + +typedef struct CSS { + DebugT debug; + int debMaxLev; + int port0; + int portZ; + char *text; + int ttl; + int mp0; + int mn; + int rp0; + int rn; + struct timespec pace; + int pacing; + int issl; + int connThreshold; // connection retry threshhold + int connTO; // connection timeout in usecs + int selTO; // selection timeout in secs + char *ceP; // -->SSL certs path + char *caP; // -->CA certs path + ShareP shP; + char home[128]; +} CST, *CSP; +#define CSA(v) if((v = (CSP)malloc(sizeof(CST))) < 0) SYSERR("malloc") + +typedef struct HeaderS { + int ttl; + int ts; + int listPort; +} HeaderT, *HeaderP; +typedef struct PayloadS { + time_t ts; + char text; +} PayloadT, *PayloadP; +typedef struct ContainerS { + HeaderT hdr; + PayloadT payl; +} ContainerT, *ContainerP; +typedef struct DataS { + DebugT debug; + ContainerP contP; + int dataLen; +} DataT, *DataP; +#define DataA(v) if((v = (DataP)malloc(sizeof(DataT))) < 0) SYSERR("malloc") + +typedef struct SocketS { + int remPort; + int sc; + SSL *sslP; +} SocketT, *SocketP; +#define SocketA(v) if((v = (SocketP)malloc(sizeof(SocketT))) < 0) SYSERR("malloc") + +typedef enum{ring, mash} topology; +typedef struct NodeS { + DebugT debug; + topology topo; + int ssc; + int locPort; + int first; + int last; + int nodes; + int nodeIdx; + int kicker; + int *forw; + int closing; + pthread_t closingThread; + SocketP cliSides; + SocketP srvSides; + DataT data; + int len; + int ssl; + SSL_CTX *ctx; + struct sigaction *sigactP; +} NodeT, *NodeP; +#define NodeA(v) if((v = (NodeP)malloc(sizeof(NodeT))) < 0) SYSERR("malloc") +typedef enum{client, server} nodeside; + +extern void debug_init(); +extern char *gpa(struct sockaddr *); +extern void gai(int, struct addrinfo *ai, DebugP); +#define GAI(level, ai) gai(level, ai, deP) +extern void ssl_err(DebugP, char*); +extern void abend(DebugP); +extern void Node(topology, int*, int, int, int, int); +extern void Data(DataP, DebugP); +extern int dttlData(DataP); +extern int ttlData(DataP); +extern int remPortData(DataP); +extern DataP loadData(DataP, int, char*); +extern char *unldData(DataP); +extern char *digest24Data(DataP, char *); +extern int chkData(DataP); +extern void sabotageData(DataP); +extern int tsData(DataP); +extern void back_trace(); +extern void pre(char *); +extern void deb(int, DebugP); +extern void err(int, char *, DebugP); + +#define DEBID(...) sprintf(deP->id, __VA_ARGS__) +#define LOG(level, ...) sprintf(deP->msg, __VA_ARGS__), deb(level, deP) +#define SYSERR(e) LOG(0, "%s: %s (%d)", e, strerror(errno), errno), abend(deP) +#define SSLERR(e) ssl_err(deP, e) +#define SOFTERR(e) LOG(0, e), fflush(stderr) +#define HARDERR(e) SOFTERR(e), abend(deP); +#define ERR(e) perror(e), exit(EXIT_FAILURE) + +#define PACING SIGUSR1 + +extern CSP csP; + +#endif diff -r 000000000000 -r 5c129dd80d4f CSc/Data.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/Data.c Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,47 @@ +#include "CS.h" + +void Data(DataP thisP, DebugP callerDeP) { + DebugP deP = &(thisP->debug); + DEBID("%s DATA", callerDeP->id); + thisP->dataLen = sizeof(HeaderT) + sizeof(PayloadT) + strlen(csP->text); + thisP->contP = (ContainerP)malloc(thisP->dataLen); + thisP->contP->hdr.ttl = 0; + thisP->contP->hdr.ts = 0; + strcpy(&(thisP->contP->payl.text), "EMPTY"); + LOG(5, "Data instance established"); +} +int dttlData(DataP thisP) { + thisP->contP->hdr.ttl--; + return thisP->contP->hdr.ttl; +} +int ttlData(DataP thisP) { + return thisP->contP->hdr.ttl; +} +int remPortData(DataP thisP) { + return thisP->contP->hdr.listPort; +} +DataP loadData(DataP thisP, int ttl, char *loadP) { + DebugP deP = &(thisP->debug); + thisP->contP->hdr.ttl = ttl; + thisP->contP->payl.ts = time(NULL); + strcpy(&(thisP->contP->payl.text), csP->text); + LOG(5, "payload loaded to container"); + return thisP; +} +char *unldData(DataP thisP) { + return &(thisP->contP->payl.text); +} +char *digest24Data(DataP thisP, char *digest) { + char *text = &(thisP->contP->payl.text); + if(strlen(text) < 24) strcpy(digest, text); + else { strncpy(digest, text, 8); strcpy(digest+8, "-------"); strcpy(digest+15, text+strlen(text)-8); } + return digest; +} +void sabotageData(DataP thisP) { + thisP->contP->payl.text = '?'; } +int chkData(DataP thisP) { + return (int)(strcmp(csP->text, &(thisP->contP->payl.text)) == 0); +} +int tsData(DataP thisP) { + return thisP->contP->hdr.ts; +} diff -r 000000000000 -r 5c129dd80d4f CSc/Debug.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/Debug.c Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,68 @@ +#include "CS.h" + +char *prg_name; +struct timeval t0; + +void debug_init(char *prgname) { + prg_name = prgname; + gettimeofday(&t0, NULL); +} +void pre(char *o) { + fprintf(stderr, "%s\n", o); + fflush(stderr); +} +void deb(int level, DebugP deP) { + if ((csP->debMaxLev >= level && csP->debMaxLev != 7) || (csP->debMaxLev == 7 && level == 7)) { + if(sem_wait(&(csP->shP->debugSem))< 0) ERR("LOG sem_wait"); + struct timeval t; + gettimeofday(&t, NULL); + fprintf(stderr, "%03ld%06d %s: %s\n", (long)t.tv_sec-t0.tv_sec, (int)t.tv_usec, deP->id, deP->msg); + fflush(stderr); + if(sem_post(&(csP->shP->debugSem)) < 0) ERR("LOG sem_post"); + } +} +void err(int level, char *o, DebugP deP) { + fprintf(stderr, "%s: %s\n", deP->id, o); + fflush(stderr); +} +void back_trace() { + if(sem_wait(&(csP->shP->debugSem))< 0) ERR("LOG sem_wait"); + + int j, nptrs; + void *buffer[24]; + char **strings; + + nptrs = backtrace(buffer, 24); + + strings = backtrace_symbols(buffer, nptrs); + if (strings == NULL) { + ERR("ABORT backtrace_symbols"); + exit(EXIT_FAILURE); + } + + char shcmd[] = "sed -e 's/.*\\[\\(.*\\)\\]$/\\1/' | addr2line -fspe "; + char *cmdbuf = malloc(sizeof(shcmd) + 64); + strcpy(cmdbuf, shcmd); + strcat(cmdbuf, prg_name); + #define SIZE 640 + char *resbuf = malloc(SIZE);; + for(j = 0; j < nptrs; j++) { + int pc[2], po[2]; + if(pipe(pc) < 0) ERR("ABORT pipe"); + if(pipe(po) < 0) ERR("ABORT pipe"); + if(!fork()) { + close(po[0]); dup2(po[1],1); + close(pc[1]); dup2(pc[0],0); + execl("/bin/bash", "/bin/bash", "-c", cmdbuf, (char *) NULL); + } + if(write(pc[1], strings[j], strlen(strings[j])) < 0) ERR("ABORT write pipe"); + if(write(pc[1], "\n", 1) < 0) ERR("ABORT write pipe"); + close(pc[1]); + memset(resbuf, 0, SIZE); + if(read(po[0], resbuf, SIZE)) fprintf(stderr, "%s", resbuf); + for(int i = 0; i < 2; i++) close(po[i]), close(pc[i]); + } + if(sem_post(&(csP->shP->debugSem))< 0) ERR("LOG sem_post"); + free(strings); free(cmdbuf); free(resbuf); + +} diff -r 000000000000 -r 5c129dd80d4f CSc/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/Makefile Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,22 @@ +S := CS.c Debug.c Node.c Data.c +O := $(S:.c=.o) +D := $(S:.c=.d) +CFLAGS = -Wall -D_GNU_SOURCE -lpthread -lm -lrt -lssl -lcrypto + +.PHONY: all clean +all: CS +clean: + rm -f CS $(O) + +CS: $(O) + gcc $(CFLAGS) $(O) -o $@ + +%.o: %.c %.d + gcc -c $(CFLAGS) -o $@ $< + +include $(D) +%.d: %.c Makefile + set -e; rm -f $@; \ + $(CC) -MM $(CPPFLAGS) $< > $@.$$$$; \ + sed 's,\($*\)\.o[ :]*,\1.o $@ : ,g' < $@.$$$$ > $@; \ + rm -f $@.$$$$ diff -r 000000000000 -r 5c129dd80d4f CSc/Node.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSc/Node.c Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,336 @@ +#include "CS.h" + +NodeP thisP; +DebugP deP; +DataP dataP; + +void sighandle(int sig) { return; } + +void bindN() { + LOG(4, "binding..."); + struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(sa, 0, sizeof(struct addrinfo)); + sa->ai_family = AF_INET; + sa->ai_socktype = SOCK_STREAM; + sa->ai_protocol = 0; + sa->ai_flags = AI_PASSIVE; + char s[64]; + sprintf(s, "%d", thisP->locPort); + int e; + if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); + GAI(4, sa); + if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); + int opt = 1; + if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); + if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); + if(listen(thisP->ssc, 1) < 0) SYSERR("listen"); + LOG(2, "bound to %d", thisP->locPort); +} +void conn(int i, int remPort) { + DebugT debug, *deP = &debug; + DEBID("%s to %u", thisP->debug.id, remPort); + thisP->cliSides[i].remPort = remPort; + int e; + LOG(4, "connecting to %u...", remPort); + int retry = csP->connThreshold; + struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(ai, 0, sizeof(struct addrinfo)); + ai->ai_family = AF_INET; + ai->ai_socktype = SOCK_STREAM; + ai->ai_protocol = 0; + ai->ai_flags = 0; + char port[6]; + sprintf(port, "%d", remPort); + if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); + GAI(3, ai); + if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); + while(retry--) { + if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { + if(errno != ECONNREFUSED) SYSERR("connect"); + usleep(csP->connTO); + } + else break; + } + if(retry < 1) { + LOG(0, "connection refused threshold %d reached", csP->connThreshold); + exit(EXIT_FAILURE); + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->conns++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = malloc(l); + if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa); + + if(thisP->ssl) { + ERR_clear_error(); + if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); + if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd"); + if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) { + switch(SSL_get_error(thisP->cliSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break; + default: SSLERR("SSL connect"); break; + } + } + } + LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1)); +} +void acc(int i) { + LOG(4, "accepting..."); + if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = malloc(l); + if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa); + if(thisP->ssl) { + int e; + ERR_clear_error(); + if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL"); + if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd"); + if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) { + switch(SSL_get_error(thisP->srvSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; + default: SSLERR("SSL accept"); break; + } + } + } + LOG(2, "accepted"); +} +void closeN(int i, nodeside side) { + SocketP sc; + if(side) sc = thisP->srvSides; else sc = thisP->cliSides; + LOG(5, "closing sc=%d...", sc[i].sc); + if(thisP->ssl) { + int e; + if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); + if(!e) { + LOG(5, "SSL shutdown rc=0"); + if((e = SSL_shutdown(sc[i].sslP)) < 0) { + switch(SSL_get_error(sc[i].sslP, e)) { + case SSL_ERROR_SYSCALL: + if(!(e = ERR_get_error())) { + if(errno) SYSERR("SSL shutdown (2)"); + break; + } + break; + default: SSLERR("SSL shutdown (2)"); break; + } + } + } + } + close(sc[i].sc); + LOG(4, "closed sc=%d", sc[i].sc); + sc[i].sc = -1; +} +void *close_clients() { + DebugT debug, *deP = &debug; + DEBID("%s CLOSE clients", thisP->debug.id); + LOG(5, "start..."); + for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client); + LOG(4, "all clients closed"); + pthread_exit(NULL); +} +void close_node() { + if(!thisP->closing) { + thisP->closingThread = 0; + if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread"); + thisP->closing = 1; + } +} +int readN(int i) { + LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc); + int n, rest = dataP->dataLen; + void *buf = dataP->contP; + while(rest > 0) { + if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } + else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } + if(n == 0) { + LOG(4, "read EOF"); + return 0; + } + else { + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly read %d bytes", n); + } + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->msgs++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort); + return dataP->dataLen; +} +int writeN(int i) { + DebugT debug, *deP = &debug; + DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort); + LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc); + int n, rest = dataP->dataLen; + void *buf = dataP->contP; + while(rest > 0) { + if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); } + else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); } + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly written %d bytes", n); + } + LOG(5, "written %d", dataP->dataLen); + return dataP->dataLen; +} +int getN(int i) { + return readN(i) > 0; +} +int putN(int i) { + dataP->contP->hdr.listPort = thisP->locPort; + return writeN(i) > 0; +} +int next_node() { + int next; + if(thisP->topo == ring) { + next = thisP->locPort + 1; + if(next > thisP->last) next = thisP->first; + } + else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort); + return next; +} +void forward(int sci) { + int next, scn; + char digest[24]; + if(getN(sci)) { + LOG(5, "received data from %u", remPortData(dataP)); + if(thisP->kicker) { + LOG(4, "received from node %u: %s, ttl=%d", + remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP)); +// if(ttlData(dataP) == 2) sabotageData(dataP); +// if(ttlData(dataP) == 2) errno=0, SYSERR("signal test"); + if(dttlData(dataP) <= 0) { + LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest)); + close_node(); + *(thisP->forw) = 0; + LOG(4, "leaving forward closing"); + return; + } + } + next = next_node(); scn = next - thisP->first; + LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP)); + if(thisP->cliSides[scn].sc < 0) conn(scn, next); + if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } + putN(scn); + LOG(5, "forwarded to %u", next); + } + else { + close_node(); + closeN(sci, server); + } + return; +} +void main_loop() { + sigset_t pacing; + sigemptyset(&pacing); + sigaddset(&pacing, PACING); + union { // simple select mask debug + fd_set rs; + uint mask; + } u; + int nfds; + FD_ZERO(&(u.rs)); nfds = 0; + if(*(thisP->forw)) { + FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } + while(nfds) { + struct timeval t = {csP->selTO, 0}; + LOG(5, "selecting, mask=%08x", u.mask); + int rc; + rc = select(nfds, &(u.rs), NULL, NULL, &t); + if(rc < 0 && errno != EINTR) SYSERR("select"); + if(rc > 0) { + LOG(5, "return from select, mask=%08x", u.mask); + if(FD_ISSET(thisP->ssc, &(u.rs))) { // ssc posted: accept & forward + int i; + for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++); // find unused slot for accept + if(i == thisP->nodes) HARDERR("can't accept, all slots in use"); + LOG(5, "slot for accept=%d", i); + acc(i); + forward(i); + } + else // check which connected socket is posted + for(int i = 0; i < thisP->nodes; i++) + if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i); + } + FD_ZERO(&(u.rs)); nfds = 0; + if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; } + for(int i = 0; i < thisP->nodes; i++) { // mask all connected client side sockets for select + int sc = thisP->srvSides[i].sc; + if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; } + } + } +} +void Node(topology topo, int *forw, int port, int first, int n, int ssl) { + NodeT this; + thisP = &this; + deP = &(thisP->debug); + DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port); + LOG(4, "initializing..."); + dataP = &thisP->data; + Data(dataP, deP); + thisP->topo = topo; + thisP->locPort = port; + thisP->first = first; + thisP->last = first + n - 1; + thisP->nodes = n; + thisP->cliSides = malloc(n*sizeof(SocketT)); + thisP->srvSides = malloc(n*sizeof(SocketT)); + for(int k=0; kcliSides[k].sc = thisP->srvSides[k].sc = -1; + thisP->kicker = (port == first); + thisP->forw = forw; + thisP->nodeIdx = port - first; + thisP->closing = 0; + thisP->ssl = ssl; + if(thisP->ssl) { + char s[64]; + SSL_load_error_strings(); + SSL_library_init(); + LOG(4, "setting SSL contex..."); + if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX"); + SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); + sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort); + if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); + sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort); + LOG(5, "SSL private key used: %s", s); + if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); + LOG(5, "SSL certificate used: %s", s); + if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1) SSLERR("hh's thrusted certs path"); + } + LOG(5, "initalized"); + + bindN(thisP); + if(thisP->kicker) { + loadData(dataP, csP->ttl, csP->text); + int sci, next; + next = next_node(); sci = next - thisP->first; + char digest[24]; + LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next); + conn(sci, next); + putN(sci); + } + + main_loop(); + + LOG(5, "closing ssc"); + close(thisP->ssc); + if(thisP->closing) { // wait for closing thread + if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); } + struct sigaction sigact; + sigfillset(&sigact.sa_mask); + sigact.sa_handler=sighandle; + if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); + if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); + int active = --csP->shP->act; + if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); + if(active > 0) pause(); + else kill(0, SIGUSR2); + int exitRc = EXIT_SUCCESS; + if(thisP->kicker && !chkData(dataP)) { + SOFTERR("INPUT AND OUTPUT DIFFER"); + exitRc = EXIT_FAILURE; } + LOG(2, "ended"); + exit(exitRc); +} diff -r 000000000000 -r 5c129dd80d4f CScpp/CS.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/CS.cpp Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,519 @@ +#include "CS.h" + +CSP csP; + +void sighandle(int sig) { return; } +void abend(DebugP deP) { + int pid; + //LOG(0, "ABORT, netstat:"); + //if(!(pid = fork())) { + // execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL); + // exit(EXIT_SUCCESS); + //} + waitpid(pid, NULL, 0); + LOG(0, "ABORT, backtrace:"); + deP->back_trace(); + kill(0, SIGTERM); + exit(EXIT_FAILURE); +} +char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket + char *s = (char*) malloc(64); + unsigned short port = *(unsigned short*) ai_addr->sa_data; + char *a = ai_addr->sa_data + 2; + sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port)); + return s; +} +void gai(int level, struct addrinfo *ai, DebugP deP) { // logs assigned IP4 addresses from addrinfo chain + struct addrinfo *sa = ai; + if (deP->debug >= level) do { + char *s = gpa(sa->ai_addr); + strcpy(deP->s, s); free(s); + deP->deb(level); + } while ((sa = sa->ai_next)); + fflush(stderr); +} +void ssl_err(char *s, DebugP deP) { + long e = ERR_get_error(); + while(e) { + LOG(0, "%s: %s", s, ERR_error_string(e, NULL)); + e = ERR_get_error(); + } +} +static int getArg(const char *a) { + return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; } +HeaderS::HeaderS() { + this->ttl = 0; } +HeaderS::HeaderS(int ttl) { + this->ttl = ttl; } +int HeaderS::len() { + return sizeof(HeaderS); } +PayloadS::PayloadS() { + strcpy(&text, "EMPTY"); } +PayloadS::PayloadS(const char *text) { + ts = time(NULL); + strcpy(&(this->text), text); } +int PayloadS::check(PayloadP p) { + return (int)(strcmp(&text, &p->text) == 0); } +char *PayloadS::deliver() { + return &text; } +string PayloadS::digest() { + string payl_s = string(&text); + if(payl_s.size() < 24) return payl_s; + else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); } +void PayloadS::sabotage() { + text = '?'; } +int PayloadS::len() { + return sizeof(PayloadS) + strlen(&text); } +int ContainerS::len() { + return hdr.len() + payl.len(); } +DataC::DataC() {} +DataC::DataC(DebugP callerDeP) { + deP = new DebugC(); + DEBID("%s DATA", callerDeP->debid); + dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size(); + contP = static_cast(operator new(dataLen)); + new(&contP->hdr) HeaderS(); + new(&contP->payl) PayloadS(); + LOG(5, "Empty Data instance established"); +} +void DataC::load(int ttl, const char *text) { + new(&contP->hdr) HeaderS(ttl); + new(&contP->payl) PayloadS(text); + LOG(5, "Data instance loaded with payload"); + return; +} +string DataC::unld() { + return string(&(contP->payl.text)); } +string DataC::digest() { + return contP->payl.digest(); } +int DataC::dttl() { + return --contP->hdr.ttl; } +int DataC::ttl() { + return contP->hdr.ttl; } +bool DataC::dataOk() { + return csP->text == string(&(contP->payl.text)); } +int DataC::ts() { + return contP->hdr.ts; } +int DataC::remPort(int remPort) { + return (contP->hdr.remPort = remPort); } +int DataC::remPort() { + return contP->hdr.remPort; } +NodeC::NodeC(ConstellationP co, int port) { + deP = new DebugC(); + DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port); + LOG(4, "intializing ..."); + data = DataC(deP); + topo = co->topo; + locPort = port; + first = co->first; + nodes = co->nodes; + last = first + nodes - 1; + cliSides = new SocketS[nodes]; + srvSides = new SocketS[nodes]; + for(int k=0; kforwP; + closing = 0; + ssl = co->ssl; + ssc = 0; + if(ssl) { + char s[128]; + SSL_load_error_strings(); + SSL_library_init(); + LOG(4, "setting SSL contex..."); + if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX"); + SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY); + SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); // SSL will claim partner certificate + sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort); + LOG(5, "SSL private key used: %s", s); + if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file"); + sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort); + LOG(5, "SSL certificate used: %s", s); + if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file"); + LOG(5, "SSL: CApath: %s", csP->caPath.c_str()); + if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path"); + } + LOG(5, "initalized"); +} +int NodeC::run() { + LOG(5, "binding, kicker=%d", kicker); + bindN(); + if(kicker) { + data.load(csP->ttl, csP->text.c_str()); + int sci, next; + next = next_node(); sci = next - first; + LOG(2, "ready to initial send %s, len=%d to node %d", data.digest().c_str(), data.dataLen, next); + conn(sci, next); + putN(sci); + } + mainLoop(); + LOG(5, "closing ssc"); + close(ssc); + if(closing) closingThread.join(); // wait for closing thread + struct sigaction sigact; + sigfillset(&sigact.sa_mask); + sigact.sa_handler = sighandle; + if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction"); + if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait"); + if(--csP->shP->act == 0) { + if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post"); + kill(0, SIGUSR2); + } + else { + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + pause(); + } + int exitRc = EXIT_SUCCESS; + if(kicker && !data.dataOk()) { + SOFTERR("input and output differ"); + exitRc = EXIT_FAILURE; + } + LOG(2, "ended"); + exit(exitRc); +} +void NodeC::mainLoop() { + fd_set rs; + int nfds; + struct timeval t = {1, 0}; + nfds = 0; FD_ZERO(&rs); + if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } + while(nfds) { + t.tv_sec = csP->selTO; + int rc; + rc = select(nfds, &rs, NULL, NULL, &t); + if(rc < 0 && errno != EINTR) SYSERR("select"); + if(rc > 0) { + if(FD_ISSET(ssc, &rs)) { + int i; + for(i=0; srvSides[i].sc > -1 && i < nodes; i++); // find unused slot for accept + if(i == nodes) HARDERR("can't accept, all slots in use"); + LOG(5, "slot for accept=%d", i); + acc(i); + forward(i); + } + else + for(int i = 0; i < nodes; i++) { + if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); } + } + FD_ZERO(&rs); nfds = 0; t.tv_sec = 1; + for(int i = 0; i < nodes; i++) { + int sc = srvSides[i].sc; + if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; } + } + if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; } + } +} +void NodeC::forward(int sci) { + int next, scn; + if(getN(sci)) { + LOG(5, "received data from %u", data.remPort()); + if(kicker) { + LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl()); + if(data.dttl() <= 0) { + LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str()); + closeClients(); + *(forwP) = 0; + LOG(4, "leaving forward closing"); + return; + } + } + next = next_node(); scn = next - first; + LOG(5, "forwarding len=%d to %d --->", data.dataLen, next); + if(cliSides[scn].sc < 0) conn(scn, next); + if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); } + putN(scn); + LOG(5, "forwarded to %u", next); + } + else { + closeClients(); + closeSocket(sci, server); + } + return; +} +void NodeC::bindN() { + LOG(4, "binding..."); + struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(sa, 0, sizeof(struct addrinfo)); + sa->ai_family = AF_INET; + sa->ai_socktype = SOCK_STREAM; + sa->ai_protocol = 0; + sa->ai_flags = AI_PASSIVE; + char s[64]; + sprintf(s, "%d", locPort); + int e; + if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e)); + GAI(4, sa); + if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc"); + int opt = 1; + if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options"); + if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind"); + if(listen(ssc, 1) < 0) SYSERR("listen"); + LOG(2, "bound to %d", locPort); +} +void NodeC::conn(int i, int remPort) { + DebugP deP = new DebugC(); + DEBID("%s to %u", this->deP->debid, remPort); + cliSides[i].remPort = remPort; + int e; + LOG(4, "connecting to %u...", remPort); + int retry = csP->connThreshold; + struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo)); + memset(ai, 0, sizeof(struct addrinfo)); + ai->ai_family = AF_INET; + ai->ai_socktype = SOCK_STREAM; + ai->ai_protocol = 0; + ai->ai_flags = 0; + char port[6]; + sprintf(port, "%d", remPort); + if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e)); + GAI(3, ai); + if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc"); + while(retry--) { + if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) { + if(errno != ECONNREFUSED) SYSERR("connect"); + usleep(csP->connTO); } + else break; } + if(retry < 1) { + LOG(0, "connection refused threshold %d reached", csP->connThreshold); + exit(EXIT_FAILURE); } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->conns++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = (sockaddr*)malloc(l); + if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa); + if(ssl) { + ERR_clear_error(); + if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); + if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd"); + if((e = SSL_connect(cliSides[i].sslP)) < 1) { + switch(SSL_get_error(cliSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: + ssl_err((char*)"SSL connect", deP); + if(e == 0) LOG(0, "SSL connect: EOF on socket"); + else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno); + abend(deP); + break; + default: + SSLERR("SSL connect"); + break; + } + } + } + LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1)); +} +void NodeC::acc(int i) { + LOG(4, "accepting..."); + if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept"); + socklen_t l = sizeof(struct sockaddr); + struct sockaddr *sa = (sockaddr*)malloc(l); + if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername"); + LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa); + if(ssl) { + int e; + ERR_clear_error(); + if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL"); + if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd"); + if((e = SSL_accept(srvSides[i].sslP)) < 1) { + switch(SSL_get_error(srvSides[i].sslP, e)) { + case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break; + default: SSLERR("SSL accept"); break; + } + } + } + LOG(2, "accepted"); +} +void NodeC::closeSocket(int i, nodeside side) { + SocketP sc; + if(side) sc = srvSides; else sc = cliSides; + LOG(5, "closing sc=%d...", sc[i].sc); + if(ssl) { + int e; + if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)"); + if(!e) { + LOG(5, "SSL shutdown rc=0"); + if((e = SSL_shutdown(sc[i].sslP)) < 0) { + switch(SSL_get_error(sc[i].sslP, e)) { + case SSL_ERROR_SYSCALL: { + long e; + if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)"); + break; + } + default: + SSLERR("SSL shutdown (2)"); + break; + } + } + } + } + close(sc[i].sc); + LOG(4, "closed sc=%d", sc[i].sc); + sc[i].sc = -1; +} +void closeCliTh(void *p) { + NodeP nP = (NodeP)p; + char *callerid = nP->deP->debid; + DebugP deP = new DebugC(); + DEBID("%s CLOSE clients thread", callerid); + LOG(5, "start..."); + for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client); + LOG(4, "all clients closed"); +} +void NodeC::closeClients() { + if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) { + cout << "closing tread: " << e.what() << '\n'; } + closing = 1; +} +int NodeC::readN(int i) { + LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc); + int n, rest = data.dataLen; + char *buf = (char*)data.contP; + while(rest > 0) { + if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); } + else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); } + if(n == 0) { + LOG(4, "read EOF"); + return 0; + } + else { + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf); + } + } + if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait"); + csP->shP->msgs++; + if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post"); + LOG(5, "read %d from %u", data.dataLen, data.remPort()); + return (data.dataLen); +} +int NodeC::writeN(int i) { + LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc); + int n, rest = data.dataLen; + char *buf = (char*)data.contP; + while(rest > 0) { + if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); } + else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); } + buf += n; rest -= n; + if(rest > 0) LOG(5, "partly written %d bytes", n); + } + LOG(5, "written %d", data.dataLen); + return data.dataLen; +} +int NodeC::getN(int i) { return readN(i) > 0; } +int NodeC::putN(int i) { + data.remPort(locPort); + return writeN(i) > 0; } +int NodeC::next_node() { + int next; + if(topo == ring) { + next = locPort + 1; + if(next > last) next = first; + } + else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort); + return next; +} +ConstellationC::ConstellationC() { + deP = NULL; + forwP = NULL; + topo = ring; + ssl = first = nodes = 0; +}; +ConstellationC::ConstellationC(topology topo, int ssl) { + deP = new DebugC(); + DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING"); + this->topo = topo; + if(topo == ring) { first=csP->rp0; nodes=csP->rn; } + if(topo == mash) { first=csP->mp0; nodes=csP->mn; } + this->ssl = ssl; + first += ssl*500; + forwP = NULL; +} +int ConstellationC::run() { + int stat = 0, pid = 0, exitRc = EXIT_SUCCESS; + if(nodes == 0) exit(0); + if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); } + pid_t *pids = new pid_t[nodes]; + LOG(1, "%d nodes starting...", nodes); + if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap"); + *forwP = 1; + for(int port = first; port < first + nodes; port++) { + if(!(pid = fork())) (new NodeC(this, port))->run(); + else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); } + } + LOG(2, "all nodes established"); + while((pid = wait(&stat)) > 0) + if(WIFEXITED(stat)) { + LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat)); + if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; + } + else exitRc = EXIT_FAILURE; + LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR"); + exit(exitRc); +} +CSS::CSS(DebugP deP, char *prgnP) { + ShareA(shP); + shP->msgs = 0; + shP->conns = 0; + if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init"); + text = "bla bla"; + ttl = 3; + rp0 = 11000; + mp0 = 12000; + rn = 0; + mn = 0; + issl = 0; + const string sslPathSuffP = "../CS"; + caPath = "/home/local/etc/ssl/certs/"; + connThreshold = 77; // connection retries threshold + connTO = 0.01 * 1000*1000; // connection sleep time in usecs + selTO = 1; // selection timeout in secs + + if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB"); + if(getenv("T") != NULL) { text = getenv("T"); } + if(getenv("CEP") != NULL) cePath = getenv("CEP"); + else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; } + if(getenv("CAP") != NULL) caPath = getenv("CAP"); + if(getArg("TTL") > 0) ttl = getArg("TTL"); + if(getArg("RP0") > 0) rp0 = getArg("RP0"); + if(getArg("MP0") > 0) mp0 = getArg("MP0"); + if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; } + if(getArg("SSL") >= 0) issl = getArg("SSL"); + if(getArg("RN") >= 0) rn = getArg("RN"); + if(getArg("MN") >= 0) mn = getArg("MN"); + shP->act = rn + mn; // initialize active node processes counter + if(issl > 1) shP->act += shP->act; + pacing = 0; + if(getenv("P") != NULL) { + double d = atof(getenv("P")); + pace.tv_sec=(time_t)trunc(d); + pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000; + if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1; + } + if(getArg("RS") >= 0) srandom(getArg("RS")); +} +int main(int argc, char *argv[]) { + DebugC::debug_init(argv[0]); + DebugP deP = new DebugC(); + DEBID("client/server demo"); + csP = new CSS(deP, argv[0]); + LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\ + argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug); + if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str()); + if(csP->issl < 2) { + if(!fork()) (new ConstellationC(ring, csP->issl))->run(); + if(!fork()) (new ConstellationC(mash, csP->issl))->run(); + } else for(int ssl = 0; ssl < csP->issl; ssl++) { + if(!fork()) (new ConstellationC(ring, ssl))->run(); + if(!fork()) (new ConstellationC(mash, ssl))->run(); + } + int stat, exitRc = EXIT_SUCCESS; + while(wait(&stat) > 0) + if(WIFEXITED(stat)) { + LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat)); + if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE; + } + else exitRc = EXIT_FAILURE; + LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns); + exit(exitRc); +} diff -r 000000000000 -r 5c129dd80d4f CScpp/CS.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/CS.h Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,171 @@ +#ifndef _CSH_ +#define _CSH_ 1 + +using namespace std; +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "Debug.h" + +typedef struct ShareS { // shared items between procs or threads + int conns; // overall connections# both in ring and mash + int msgs; // overall forwards# both in ring and mash + int act; // active nodes# both in ring and mash + sem_t counterSem; // semaphore for counters +} ShareT, *ShareP; +#define ShareA(v) if((v = (ShareP) mmap(NULL, sizeof(ShareT), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap") + +typedef struct CSS { // top level attributes + string text; // text to be sent in messages + int ttl; // TTL for circulating msgs + int mp0; // TCP port of first mash node + int mn; // intended # of nodes in mash + int rp0; // TCP port of first ring node + int rn; // intended # of nodes in ring + struct timespec pace; // pacing time quantum + int pacing; // pacing indicator + int issl; // ssl switch: 0=no_ssl, 1=ssl, 2=both + int connThreshold; // connection retry threshhold + int connTO; // connection timeout in usecs + int selTO; // selection timeout in secs + string cePath; // SSL certs path + string caPath; // CA certs path + ShareP shP; + CSS(DebugP, char*); +} *CSP; + +typedef struct HeaderS { // container header + int ttl; + int ts; + int remPort; + + HeaderS(); + HeaderS(int); + int len(); +} *HeaderP; + +typedef struct PayloadS { // payload structure + time_t ts; + char text; + + PayloadS(); + PayloadS(const char *); + int check(PayloadS*); + char *deliver(); + string digest(); + void sabotage(); + int len(); +} *PayloadP; + +typedef struct ContainerS { // data container sent throug mash or ring + HeaderS hdr; + PayloadS payl; + int len(); +} ContainerT, *ContainerP; + +typedef class DataC { // data sent and received +public: + DebugP deP; + ContainerP contP; + int dataLen; +// int transferLen; + + DataC(); + DataC(DebugP); + int dttl(); + int ttl(); + void load(int, const char*); + string unld(); + string digest(); + bool dataOk(); + int ts(); + int remPort(); + int remPort(int); +} *DataP; + +typedef struct SocketS { // info about sockets allocated in node + int remPort; + int sc; + SSL *sslP; +} *SocketP; + +typedef enum{ring, mash} topology; +typedef enum{client, server} nodeside; +class ConstellationC { // constellation of communication nodes (mash or ring) + DebugP deP; +public: + topology topo; + int ssl, first, nodes, *forwP; + + ConstellationC(); + ConstellationC(topology, int); + int run(); +}; +typedef ConstellationC *ConstellationP; + +class NodeC : public ConstellationC { // attributes and operations of one node of constellation (mash or ring) +public: + DebugP deP; + int locPort; + int last; + int kicker; + int closing; + thread closingThread; // thread to close client side sockets + DataC data; + int len; + int ssc; + SocketP cliSides; + SocketP srvSides; + SSL_CTX *ctxP; + + NodeC(ConstellationP, int); + int run(); + void mainLoop(); + void bindN(); + void conn(int, int); + void acc(int); + void closeSocket(int, nodeside); + int getN(int); + int readN(int); + int putN(int); + int writeN(int); + int next_node(); + void forward(int); + void closeClients(); +}; +typedef NodeC *NodeP; + +extern char *gpa(struct sockaddr *); +extern void gai(int, struct addrinfo *ai, DebugP); +#define GAI(level, ai) gai(level, ai, deP) +extern void ssl_err(char*); +extern void abend(); + +#define SYSERR(e) LOG(0, "%s: %s (%d)", (char*)e, strerror(errno), errno), abend(deP) +#define SSLERR(e) ssl_err((char*)e, deP), abend(deP) +#define SOFTERR(e) LOG(0, e), fflush(stderr) +#define HARDERR(e) SOFTERR(e), abend(deP); + +extern CSP csP; + +#endif diff -r 000000000000 -r 5c129dd80d4f CScpp/Debug.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/Debug.cpp Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,80 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "Debug.h" +#define ERR(e) perror(e), kill(0, SIGTERM), exit(EXIT_FAILURE) + +// independent utility staff + +int DebugC::debug = 0; +char *DebugC::prg_name; +long int DebugC::t0; +sem_t *DebugC::semP; + +void DebugC::deb(int level) { + if (debug >= level) { + if(sem_wait(semP)< 0) ERR("LOG sem_wait"); + struct timeval t; + gettimeofday(&t, NULL); + fprintf(stderr, "%09ld %s: %s\n", 1000000*t.tv_sec+t.tv_usec-t0, debid, s); + fflush(stderr); + if(sem_post(semP) < 0) ERR("LOG sem_post"); + } +} +void DebugC::back_trace() { + if(sem_wait(semP)< 0) ERR("LOG sem_wait"); + + int j, nptrs; + void *buffer[24]; + char **strings; + + nptrs = backtrace(buffer, 24); + + strings = backtrace_symbols(buffer, nptrs); + if (strings == NULL) { + ERR("ABORT backtrace_symbols"); + exit(EXIT_FAILURE); + } + + char shcmd[] = "sed -e 's/.*\\[\\(.*\\)\\]$/\\1/' | addr2line -fspe "; + char *cmdbuf = (char*)malloc(sizeof(shcmd) + 64); + strcpy(cmdbuf, shcmd); + strcat(cmdbuf, prg_name); + #define SIZE 640 + char *resbuf = (char*)malloc(SIZE);; + for(j = 0; j < nptrs; j++) { + int pc[2], po[2]; + if(pipe(pc) < 0) ERR("ABORT pipe"); + if(pipe(po) < 0) ERR("ABORT pipe"); + if(!fork()) { + close(po[0]); dup2(po[1],1); + close(pc[1]); dup2(pc[0],0); + execl("/bin/bash", "/bin/bash", "-c", cmdbuf, (char *) NULL); + } + if(write(pc[1], strings[j], strlen(strings[j])) < 0) ERR("ABORT write pipe"); + if(write(pc[1], "\n", 1) < 0) ERR("ABORT write pipe"); + close(pc[1]); + memset(resbuf, 0, SIZE); + if(read(po[0], resbuf, SIZE)) fprintf(stderr, "%s", resbuf); + for(int i = 0; i < 2; i++) close(po[i]), close(pc[i]); + } + if(sem_post(semP)< 0) ERR("LOG sem_post"); + free(strings); free(cmdbuf); free(resbuf); +} +void DebugC::debug_init(char *prgname) { + struct timeval t; + DebugC::prg_name = prgname; + gettimeofday(&t, NULL); + DebugC::t0 = 1000000*t.tv_sec+t.tv_usec; + if((DebugC::semP = (sem_t*)mmap(NULL, sizeof(sem_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) ERR("mmap"); + if(sem_init(DebugC::semP, 1, 1) < 0) ERR("LOG sem_init"); +} + diff -r 000000000000 -r 5c129dd80d4f CScpp/Debug.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/Debug.h Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,25 @@ +#ifndef _DebugH_ +#define _DebugH_ 1 + +class DebugC { // independent utility staff + public: + static int debug; // debug log level + static char *prg_name; // not used at the moment + static long int t0; + static sem_t semaphore; // semafor for log timestamping + static sem_t *semP; // -->semafor for log timestamping + static void debug_init(char *prgname); + + char s[256]; // debug msg workspace + char debid[128]; // debug ID of process + + void pre(char *o); + void deb(int level); + void err(int level, char *o); + void back_trace(); +}; +typedef DebugC *DebugP; + +#define DEBID(...) sprintf(deP->debid, __VA_ARGS__) +#define LOG(level, ...) sprintf(deP->s, __VA_ARGS__), deP->deb(level) +#endif diff -r 000000000000 -r 5c129dd80d4f CScpp/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CScpp/Makefile Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,20 @@ +#O := CS.o Debug.o RingNode.o MashNode.o N.o Node.o Partner.o Payload.o Data.o +O := CS.o Debug.o +H := CS.h + +CPPFLAGS += -Wall -D_GNU_SOURCE -lc -lpthread -lrt -lssl -lcrypto + +.PHONY: all clean +all: CS + +CS: $(O) + g++ $(CPPFLAGS) $(O) -o $@ + +%: %.cpp + g++ $(CPPFLAGS) -o $@ $< + +%.o: %.cpp $(H) + g++ -c $(CPPFLAGS) -o $@ $< + +clean: + rm -f CS $(O) diff -r 000000000000 -r 5c129dd80d4f CSj/CS.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSj/CS.java Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1135 @@ +import java.util.*; +import java.io.*; +import java.net.*; +import javax.net.ssl.*; +import java.security.*; +import java.nio.*; +import java.nio.channels.*; +import java.awt.*; +import java.awt.image.*; +import java.awt.event.*; +import javax.swing.*; +import javax.swing.border.*; + +class Debug { + static int debug; + String debid = ""; + Debug(Debug d) { debid = d.debid; } + Debug(String debid) { this.debid = debid; } + Debug() {} + static void pRe(Object o) { + System.err.println((new Date().getTime()) + " " + o); + System.err.flush(); + } + void log(int level, Object o) { + if(debug == 7) { if(level == 7) pRe(debid + ": " + o); } + else if(debug >= level) pRe(debid + ": " + o); + } + boolean abendMsg(String msg, Exception x) { + log(0, "ABEND: " + msg + (x == null ? "" : (": " + x.getClass().getSimpleName() + ": " + x.getMessage()))); + return false; + } + String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit(); } +} +class Data extends Debug implements Serializable { + static final long serialVersionUID = 42; + class DataObj implements Serializable { + static final long serialVersionUID = 42; + String text; + int ttl; + int lport; + int rport; + DataObj(String s, int ttl) { + this.text = s; + this.ttl = ttl; + this.lport = 0; + this.rport = 0; + } + } + ByteBuffer buf; + DataObj data; + Data(Debug deb, String s, int ttl) throws Exception { + super(deb.debid + " DATA"); + data = new DataObj(s, ttl); + if(CS.fake != 0 && debid.equals("DEMO SSL MASH DATA")) throw new Exception("faked error"); // <----------------------------------------------- + load(); + } + Data(Debug deb, ByteBuffer b) throws Exception { + super(deb); + debid += " DATA"; + log(5, "unloading data objects from buffer..."); + unload(); + } + Data(Debug deb) { + super(deb); + debid += " DATA"; + } + void load() throws IOException { + log(5, "loading data objects to buffer..."); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + new ObjectOutputStream(bas).writeObject(data); + buf = ByteBuffer.wrap(bas.toByteArray()); + } catch(IOException x) { abendMsg("data buffer " + data + " load error", x); throw new IOException(x); } + } + void unload() throws Exception { + buf.rewind(); + data = (DataObj) new ObjectInputStream(new ByteArrayInputStream(buf.array())).readObject(); + } + int dttl() { + return --data.ttl; + } + int ttl() { + return data.ttl; + } + boolean equals(Data data) { + return this.data.text.equals(data.data.text); + } + public String toString() { + return "data: ttl=" + data.ttl + ", text=" + + (data.text.length() < 24 ? data.text : data.text.substring(0, 8) + "-------" + data.text.substring(data.text.length() - 8)); + } +} +class Node extends Debug implements Runnable { + boolean kicker, closing = false; + int thisNode, nextNode, closingThreshHold = 0; + Data data; + Selector selector; + ServerSocketChannel ssc; + SelectionKey ssk; + SSLContext sslC = null; + HashMap cliSide = new HashMap(); + HashMap srvSide = new HashMap(); + Constellation con; + Node(Constellation con, int port) { + super(con); + debid = (con.ssl ? "" : "non") + "SSL " + (con.mash ? "mash" : "ring") + " node " + port; + log(4, "initalizing..."); + try { + this.con = con; + thisNode = port; + kicker = (thisNode == con.first); + if(con.ssl) try { prepareSSL(); } catch(Exception x) { abend("SSL preparation", x); throw new Exception(); } + try { bind(); } catch(Exception x) { abend("bind", x); throw new Exception(); } + data = new Data(this); + data.buf = ByteBuffer.allocate(con.data.buf.limit()); + log(5, "initalized"); + } catch(Exception x) { abend("initialization", x); } + } + public void run() { + runLoop(); + log(2, "ended"); + synchronized(con.glob) { con.glob.spawned--; con.glob.notify(); } + } + public void runLoop() { + if(con.glob.doForward && kicker) { // kick off + log(1, "ready to initial send (" + con.data.buf.limit() + " bytes) " + con.data.toString()); + data.buf.put(con.data.buf); data.buf.flip(); + try { data.unload(); data.load(); } catch(Exception x) {}; + doForward(); + } + //if(con.glob.doForward && !con.glob.abend) do { // main loop + if(con.glob.doForward) do { // main loop + int k = 0; + log(5, "main loop, waiting for " + (con.glob.doForward ? "data" : "close") + + " from " + srvSide.size() + " open sockets, timeout=" + CS.selTO + " msecs, closingThreshHold=" + closingThreshHold); + if(!con.glob.doForward && closingThreshHold == 0) closingThreshHold = 5000 / CS.selTO; + try { + try { while((k = selector.select(CS.selTO)) == 0 && con.glob.doForward) {} + } catch(Exception x) { abend("socket channel select", x); throw new Exception(x); } + if(k > 0) { + for(Iterator ski = selector.selectedKeys().iterator(); ski.hasNext();) { + SelectionKey sk = ski.next(); + if(sk.isAcceptable()) + try { acc(); } catch(Exception x) { abend("main loop accept error", x); throw new Exception(x); } + if(sk.isReadable()) + try { forward(sk); } catch(Exception x) { abend("main loop forward error", x); throw new Exception(x); } + ski.remove(); + } + } + } catch (Exception x) { abend("main loop", x); } + if(con.glob.stop) { + log(1, "constellation stop, closing all connections"); + stop(); return; + } + } while(con.glob.doForward || ((srvSide.size() > 0) && (closingThreshHold-- > 0))); + //} while((con.glob.doForward || (srvSide.size() > 0)) && !con.glob.abend); + log(5, "closing ssc..."); + try { ssc.close(); } catch (Exception x) { abend("ssc close", x); } + if(kicker && !con.glob.stop && !con.glob.abend) con.glob.abend = !data.equals(con.data); + } + private void forward(SelectionKey sk) { + log(5, "entering 'forward'..."); + SockIO si = srvSide.get(sk); + if(!si.get()) { + stop(); + si.close(); + srvSide.remove(sk); + } + else { + log(5, "received " + prBuf(data.buf)); + try { data.unload(); } catch(Exception x) { abend("data unload at forwarding", x); return; } + if(kicker) { + log(3, "received from " + (con.mash ? "mash: " : "ring: ") + data.toString()); + if(data.dttl() <= 0) { + if(CS.isGui) con.cBox.ttl = data.ttl(); + log(1, "TTL 0 reached, received " + data.toString() + ", leaving 'forward' closing all connections"); + stop(); return; + } + try { data.load(); } catch(Exception x) { abend("data load at forwarding", x); return; } + } + if(con.glob.doForward) doForward(); + } + log(4, "leaving 'forward'"); + } + private void doForward() { + if((nextNode = chooseNextNode()) == 0) { stop(); return; } + log(3, "forwarding data to " + nextNode); + if(CS.pacing) pacing(nextNode); + if(CS.pace > 0) try { Thread.sleep(CS.pace); } catch(InterruptedException i) {}; + if(!cliSide.get(nextNode).put()) stop(); + } + private int chooseNextNode() { + int next; + if(con.mash) while((next = (con.first + CS.r.nextInt(con.nodes))) == thisNode); + else { next = thisNode + 1; if(next > con.last) next = con.first; } + if(cliSide.containsKey(next)) return next; + else return (conn(next) ? next : 0); + } + private void pacing(int nextNode) { + log(4, "constls.pacing"); + synchronized(CS.constls) { + if(!con.glob.pacingGo) try { CS.constls.wait(); } catch(InterruptedException x) {} + if(nextNode > 0) { + con.cBox.currLink[0] = thisNode - con.first; + con.cBox.currLink[1] = nextNode - con.first; + } + con.cBox.ttl = data.ttl(); + log(4, "constls.notify"); CS.constls.notify(); + con.glob.pacingGo = false; + } + } + public void stop() { + con.glob.doForward = false; + synchronized(con) { con.notify(); } + closeNode(); + } + private void abend(String msg, Exception x) { + abendMsg(msg, x); + con.glob.abend = true; + CS.isRun = false; + stop(); + } + private void closeNode() { + if(!closing) new Thread(new closeClients(this)).start(); + closing = true; + } + private class closeClients extends Debug implements Runnable { + closeClients(Debug deb) { this.debid = deb.debid + " clients CLOSE"; } + public void run() { + log(4, "starting..."); + for(Iterator i = cliSide.keySet().iterator(); i.hasNext();) { + int port = i.next(); + log(4, "closing conn to " + port); + cliSide.get(port).close(); + } + log(4, "end"); + } + } + private void prepareSSL() throws Exception { + char[] passphrase = "passphrase".toCharArray(); + KeyStore ks = KeyStore.getInstance("JKS"); + String ksFile = (System.getenv("KSF") == null) ? CS.cePath + "/testkeys" : System.getenv("KSF"); + FileInputStream kfs = new FileInputStream(ksFile); + ks.load(kfs, passphrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, passphrase); + KeyStore ts = KeyStore.getInstance("JKS"); + FileInputStream tfs = new FileInputStream(ksFile); + ts.load(tfs, passphrase); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(ts); + sslC = SSLContext.getInstance("TLS"); + sslC.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + kfs.close(); + tfs.close(); + } + private void bind() throws IOException { + log(5, "binding..."); + selector = Selector.open(); + ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); + ssc.socket().bind(new InetSocketAddress(thisNode), 1024); + ssk = ssc.register(selector, SelectionKey.OP_ACCEPT); + log(2, "bound to " + thisNode); + } + private boolean conn(int remPort) { + int retry = CS.connThreshold; + boolean connected = false; + SocketChannel sc = null; + log(4, "connecting to " + remPort + ", timeout=" + (CS.connThreshold*CS.connTO)/1000 + " secs. ..."); + while(!connected && retry-- > 0 && con.glob.doForward) { + try { + sc = SocketChannel.open(new InetSocketAddress("localhost", remPort)); + connected = true; + } catch(Exception x) { + if(x.getMessage().equals("Connection refused")) { + try { Thread.sleep(CS.connTO); } catch(InterruptedException i) {} + } else { abendMsg("connection to " + remPort, x); return false; } + } + } + if(!con.glob.doForward) return false; + if(connected) { + try { cliSide.put(remPort, new SockIO(this, sc, false)); + } catch(Exception x) { abendMsg("connection to " + remPort, x); return false; } + log(2, "connected after " + (CS.connThreshold - retry + 1) + " retries"); + return true; + } + else { abendMsg("connection to " + remPort + " timeout", null); return false; } + } + private void acc() throws Exception { + try { + SocketChannel sc = ssc.accept(); + sc.configureBlocking(false); + SockIO si = new SockIO(this, sc, true); + SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); + srvSide.put(sk, si); + CS.connCnt++; + log(2, "connection accepted"); + } catch(Exception x) { abend("connection accept", x); throw new Exception(x); } + } + private class SockIO extends Debug { + SocketChannel sc; + Selector handshakeSelector; + SelectionKey sk; + Runnable ru; + SSLSession session; + SSLEngine e; + SSLEngineResult r; + ByteBuffer ci, co, ib; + boolean wrapper; + SockIO(Debug deb, SocketChannel sc, boolean server) throws Exception { + this.debid = deb.debid + " socket " + (server ? "(server)" : "(client)"); + log(5, "initializing..."); + this.sc = sc; + if(con.ssl) { + handshakeSelector = Selector.open(); + sc.configureBlocking(false); + sk = sc.register(handshakeSelector, SelectionKey.OP_READ); + e = sslC.createSSLEngine(); + session = e.getSession(); + int am = session.getApplicationBufferSize(); + int pm = session.getPacketBufferSize(); + ib = ByteBuffer.allocate(am); // pišvejcova konstanta + co = ByteBuffer.allocateDirect(pm); + ci = ByteBuffer.allocateDirect(pm); + if(server) { + e.setUseClientMode(false); + e.setNeedClientAuth(true); + } else e.setUseClientMode(true); + } + log(4, "initialized"); + } + String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit() + "/" + b.capacity(); } + private String eStat() { + String stat; + if (r != null) + stat = r.getStatus() + "/" + r.getHandshakeStatus() + "/" + e.getHandshakeStatus() + + ", bytes: " + r.bytesConsumed() + "/" + r.bytesProduced(); + else stat = "-/-/" + e.getHandshakeStatus() + " -/-"; + return stat; + } + //-- result status + private boolean isOK() { + return r.getStatus() == SSLEngineResult.Status.OK; } + private boolean isClosed() { + return r.getStatus() == SSLEngineResult.Status.CLOSED; } + private boolean isBad() { + return r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW; } + //-- result handshake status + private boolean handShake() { + return r.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; } + private boolean handShakeEnd() { + return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED; } + private boolean needTask() { + return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK; } + //-- engine handshake status + private boolean needUnwrap() { + return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP; } + private boolean needWrap() { + return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP; } + private int read(SocketChannel sc, ByteBuffer b) { + int n = -1, k = 0; + log(5, "read, " + prBuf(b) + " ..."); + if(con.ssl) { + try { while((k = handshakeSelector.select(CS.selTO)) == 0) { if(!con.glob.doForward) break; } + } catch (Exception x) { abendMsg("handshake select", x); k = 0; } + if(k>0 && sk.isReadable()) { + try { n = sc.read(b); } catch(Exception x) { abendMsg("socket channel read", x); n = -1; } + handshakeSelector.selectedKeys().remove(sk); + } + } else + try { while(b.hasRemaining()) { n += sc.read(b); if(n < 0) break; }; n++; + } catch(Exception x) { abendMsg("socket channel read", x); n = -1; }; + log(4, "read " + n); + return n; + } + private int write(SocketChannel sc, ByteBuffer b) { + log(5, "writing " + prBuf(b) + " ..."); + int n = 0; + try { n = sc.write(b); } catch(Exception x) { abendMsg("socket channel write", x); n = -1; } + log(4, "written " + n); + return n; + } + private boolean replenish() { + log(5, "replenishing ci..."); + ci.clear(); + int n = read(sc, ci); + ci.flip(); + return (n >= 0); + + } + void handleUnWrapStatus() { + ByteBuffer b; + if(r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW) { + log(5, "unwrap: ib BUFFER_OVERFLOW " + prBuf(ib)); + if(ib.position() > 0) { ib.flip(); data.buf.put(ib); ib.clear(); } + else { + b = ByteBuffer.allocate((int)(1.25 * ib.capacity())); + ib.flip(); b.put(ib); ib = b; } + log(5, "ib " + prBuf(ib)); + } + if(r.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) { + int n; + log(5, "unwrap: ci BUFFER_UNDERFLOW " + prBuf(ci)); + if(ci.limit() < ci.capacity()) { + ci.mark(); ci.position(ci.limit()); ci.limit(ci.capacity()); + n = read(sc, ci); ci.limit(ci.position()); ci.reset(); } + else { + n = ci.capacity(); if(ci.position() == 0) n *= 2; + b = ByteBuffer.allocate(n); b.put(ci); ci = b; + n = read(sc, ci); ci.flip(); } + log(5, "additional " + n + " bytes read: " + prBuf(ci)); } + } + boolean doWrap() { + log(5, "entering " + "wrap, ob: " + prBuf(data.buf) + ", co: " + prBuf(co) + " ..."); + try { r = e.wrap(data.buf, co); } catch (Exception x) { abend("SSL engine wrap", x); return false; } + //if(isBad()) { con.glob.abend = true; return abendMsg("SSL engine status: " + r.getStatus().toString(), null); } + if(isBad()) { abend("SSL engine status: " + r.getStatus().toString(), null); return false; } + log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " wrap: " + eStat() + ", ob: " + prBuf(data.buf) + ", co: " + prBuf(co)); + if(isClosed()) return false; + return true; + } + private boolean wrap() { + do {co.clear(); + if(!doWrap()) return false; + if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run(); + co.flip(); + if(handShake() && (write(sc, co) == -1)) return false; + } while (needWrap()); + if(needUnwrap() && !( replenish() && unwrap() )) return false; + log(5, (wrapper ? "wrapper" : "unwrapper") + " wrap HS finished=" + handShakeEnd()); + if(wrapper && handShakeEnd()) { + co.clear(); + if(!doWrap()) return false; + co.flip(); + } + return true; + } + boolean doUnwrap() { + log(5, "entering " + "unwrap, ib: " + prBuf(ib) + ", ci: " + prBuf(ci) + " ..."); + try { r = e.unwrap(ci, ib); } catch (Exception x) { abend("SSL engine unwrap", x); return false; } + log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " unwrap: " + eStat() + ", ib: " + prBuf(ib) + ", ci: " + prBuf(ci)); + if(isClosed()) return false; + return true; + } + private boolean unwrap() { + do { // while( needUnwrap() ) + do { // while( needUnwrap() && ci.hasRemaining() ) + if(!doUnwrap()) return false; + if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run(); + } while(needUnwrap() && ci.hasRemaining()); + if(needUnwrap() && !replenish()) return false; + } while(needUnwrap()); + if(needWrap() && !wrap()) return false; + if(!wrapper) { + if(handShakeEnd() && !replenish()) return false; + if(handShakeEnd() || !handShake()) { + handleUnWrapStatus(); + while(ci.hasRemaining()) { + do { + if(!doUnwrap()) return false; + handleUnWrapStatus(); + } while(!isOK()); + } + } + } + return true; + } + boolean get() { + wrapper = false; + boolean got = false; + data.buf.clear(); + if(con.ssl) { + while(data.buf.hasRemaining()) { + ib.clear(); + int n; + ci.clear(); n = read(sc, ci); ci.flip(); + if(n < 0) { got = false; break; } + else got = (unwrap() && !isClosed()); + if(got) { ib.flip(); data.buf.put(ib); log(4, "partially got " + prBuf(data.buf)); } + else break; + } + } + else try { got = (read(sc, data.buf) >= 0); } catch(Exception x) { got = abendMsg("socket read", x); } + return got; + } + boolean put() { + wrapper = true; + boolean put = false; + log(4, "put: ob: " + prBuf(data.buf)); + try { + do { + if(con.ssl) { + if(!wrap() || isClosed()) return false; + else put = (write(sc, co) >= 0); + } else put = (write(sc, data.buf) >= 0); + } while(put && data.buf.hasRemaining()); + if(put) CS.forwCnt++; + } catch(Exception x) { put = abendMsg("socket write", x); } + return put; + } + void close() { + log(4, "terminating connection..."); + try { + if(con.ssl) { + data.buf.put(ByteBuffer.wrap("".getBytes())); + e.closeOutbound(); + wrap(); + } + sc.close(); + } catch(Exception x) {} + } + } +} +class Constellation extends Debug implements Runnable { + class Glob { + boolean doForward = true; + boolean pacingGo = false; + boolean stop = false; + boolean abend = false; + int D = 0; + int spawned = 0; + int from = 0, to = 0; + } + volatile Glob glob; + boolean mash, ssl; + int first, last, nodes; + Data data; + CS cs; + Gui.CBox cBox; + String label; + Constellation(CS cs, Gui.CBox cBox, boolean mash, boolean ssl) { + label = (ssl ? "" : "non") + "SSL " + (mash ? "MASH" : "RING"); + debid = "DEMO " + label; + this.cs = cs; + this.cBox = cBox; + this.mash = mash; + this.ssl = ssl; + if(mash) { first = CS.mp0; nodes = CS.mn; } + else { first = CS.rp0; nodes = CS.rn; } + glob = new Glob(); + } + Constellation(Constellation con) { + super(con); + this.cs = con.cs; + this.mash = con.mash; + this.ssl = con.ssl; + this.first = con.first; + this.last = con.last; + this.nodes = con.nodes; + this.glob = con.glob; + } + void stop() { glob.stop = true; } + //void reset() { + //first = mash ? CS.mp0 : CS.rp0; + //first += nodes; + //glob.doForward = true; + //glob.abend = false; + //glob.spawned = 0; + //} + public void run() { + log(4, "starting"); + glob.pacingGo = true; + runNodes(); + synchronized(CS.constls) { CS.constls.notify(); } + synchronized(cs) { + if(--CS.notFinished == 0) cs.notifyAll(); + else try { cs.wait(); } catch(InterruptedException e) {}; + } + log(1, glob.abend ? "BAD: constellation not finished correctly" : "OK, constellation finished correctly"); + if(glob.abend) CS.abend = true; + CS.spawned--; + synchronized(cs) { cs.notify(); } + } + void runNodes() { + log(1, nodes + " nodes constellation, ttl=" + CS.pttl + " starting..."); + first += (ssl ? 500 : 0); + last = first + nodes - 1; + if(data == null) + try { data = new Data(this, CS.text, CS.pttl); + } catch(Exception x) { abendMsg("creating initial data", x); return; } + synchronized(glob) { + for(int port = first; (port < first + nodes); port++) { + try { + new Thread(new Node(this, port), "Node " + port).start(); + log(3, "node " + port + " established"); + glob.spawned++; + } catch(Exception x) { glob.doForward = false; glob.abend = true; break; } + } + if(glob.doForward) log(2, "all nodes established"); + while(glob.spawned > 0) try { glob.wait(); } catch(InterruptedException e) {}; + } + log(2, "all nodes finished"); + } +} +class Gui extends Debug implements Runnable { + class Parms extends JPanel { + static final long serialVersionUID = 43; + class Parm implements ActionListener { + JComboBox valueEntry; + JLabel valueLabel; + Parm(Number[] values, Number value, String label, boolean editable, boolean rowEnd) { + log(5, "parm " + label); + valueLabel = new JLabel(label); + valueLabel.setBorder(b); + if(orientation == HORIZONTAL) gridC.gridwidth = 1; + else gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(valueLabel, gridC); + add(valueLabel); + valueEntry = new JComboBox(values); + valueEntry.setPreferredSize(new Dimension(prefComboWidth, prefComboHeight)); + if(value != null) valueEntry.setSelectedItem(value); + valueEntry.setEditable(editable); + valueEntry.addActionListener(this); + if(orientation == HORIZONTAL && rowEnd) gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(valueEntry, gridC); + add(valueEntry); + } + public void actionPerformed(ActionEvent e) { + try { setVal((Number)((JComboBox)e.getSource()).getSelectedItem()); } catch(Exception x) { log(0, x.getMessage()); } + } + void getEnv() {} + void setVal(Number v) {} + } + class Buttons extends Box { + class GoButton extends JButton implements ActionListener { + static final long serialVersionUID = 44; + GoButton() { + super("go"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + CS.go = true; + } + public void actionPerformed(ActionEvent e) { + log(5, getText() + " button pressed"); + if(getText().equals("go")) { setText("pause"); CS.go = true; CS.pacing = true; CS.isRun = true; awake(); } + else { setText("go"); CS.go = false; } + } + } + class StepButton extends JButton implements ActionListener { + static final long serialVersionUID = 44; + StepButton() { + super("step"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { + log(5, getText() + " button pressed"); + CS.go = false; CS.pacing = true; CS.isRun = true; + setGo(); + awake(); + } + } + class ResetButton extends JButton implements ActionListener { + static final long serialVersionUID = 45; + ResetButton() { + super("reset"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { CS.isRun = false; CS.isReset = true; CS.go = true; awake(); } + } + class StopButton extends JButton implements ActionListener { + static final long serialVersionUID = 46; + StopButton() { + super("end"); + addActionListener(this); + gridC.gridwidth = 1; + //gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { closeUI(); } + } + GoButton go; + ResetButton reset; + StepButton step; + StopButton stop; + Box row1, row2; + Buttons() { + super(BoxLayout.Y_AXIS); + setBorder(b); + add(row1 = new Box(BoxLayout.X_AXIS)); + add(row2 = new Box(BoxLayout.X_AXIS)); + row1.add(go = new GoButton()); + row1.add(step = new StepButton()); + row2.add(reset = new ResetButton()); + row2.add(stop = new StopButton()); + } + void setGo() { go.setText("go"); } + void setPause() { go.setText("pause"); } + void enableStep(boolean b) { step.setEnabled(b); } + } + static final boolean EDITABLE = true; + static final boolean ROW_END = true; + boolean orientation; + EmptyBorder b = new EmptyBorder(0,7,0,7); + GridBagLayout gridL = new GridBagLayout(); + GridBagConstraints gridC = new GridBagConstraints(); + int prefComboWidth, prefComboHeight; + Buttons buttons; + Parms(boolean orientation) { + textHeight = (int)Math.round(1.5 * getFontMetrics(getFont()).getHeight()); + prefComboWidth = (int)Math.round(1.5 * getFontMetrics(getFont()).bytesWidth("000000".getBytes(), 0, 6)); + prefComboHeight = textHeight; + this.orientation = orientation; + gridC.fill = GridBagConstraints.BOTH; + setFont(new Font("SansSerif", Font.PLAIN, 9)); + setLayout(gridL); + new Parm(new Integer[] {0,1,2,3,4,5,7,9}, new Integer(CS.debug), "Debug level", !EDITABLE, !ROW_END) { + void setVal(Number v) { CS.debug = v.intValue(); } }; + new Parm(new Integer[] {0,1,2}, new Integer(CS.issl), "SSL", !EDITABLE, !ROW_END) { + void setVal(Number v) { CS.issl = v.intValue(); } }; + new Parm(new Integer[] {CS.pttl}, null, "TTL", EDITABLE, ROW_END) { + void setVal(Number v) { CS.pttl = v.intValue(); } }; + new Parm(new Double[] {(double)CS.ipace/1000}, null, "pace in secs.", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.ipace = (int)(1000.0 * v.doubleValue()); CS.pace = CS.ipace; } }; + new Parm(new Integer[] {CS.mp0}, null, "listen port of first MASH node", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.mp0 = v.intValue(); } }; + new Parm(new Integer[] {CS.rp0}, null, "listen port of first RING node", EDITABLE, ROW_END) { + void setVal(Number v) { CS.rp0 = v.intValue(); } }; + new Parm(new Integer[] {CS.mn}, null, "MASH constellation size", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.mn = v.intValue(); } }; + new Parm(new Integer[] {CS.rn}, null, "RING constellation size", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.rn = v.intValue(); } }; + new Parm(new Double[] {(double)CS.selTO/1000}, null, "I/O selection timeout in secs.", EDITABLE, ROW_END) { + void setVal(Number v) { CS.selTO = 1000 * v.intValue(); } }; + new Parm(new Integer[] {CS.fake}, null, "point of faked exception (integer)", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.fake = v.intValue(); } }; + new Parm(new Integer[] {CS.rs}, null, "random seed (integer)", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.rs = v.intValue(); } }; + //gridC.gridwidth = 0; + gridC.gridwidth = GridBagConstraints.REMAINDER; + add(buttons = new Buttons()); + } + } + class CBoxBg extends BufferedImage { + final int nodeC[][]; // node centers + CBoxBg(int n) { + super(cBoxSize , cBoxSize, BufferedImage.TYPE_INT_RGB); + nodeC = new int[n][2]; + log(5, "cnstlltn bg image beg, node centers array length=" + nodeC.length); + final double a0 = Math.PI / 2; + final double aN = 2 * Math.PI / n; + final int b = 3; // border + final int r = 5; // node diameter + int cx, cy; // constellation center coordinates + cx = cy = cBoxSize/2; + int R = cBoxSize/2 - r - 2*b; // distance of node centers from constellation center + int dx, dy; // deltas of node center coordinates + final Graphics2D g2 = (Graphics2D)this.getGraphics(); + g2.setBackground(Color.WHITE); + g2.clearRect(0, 0, cBoxSize, cBoxSize); + g2.setColor(Color.BLACK); + g2.draw3DRect(b, b, cBoxSize - 2*b, cBoxSize - 2*b, true); + if(n < 2) return; + for(int i=0; i 0) { + resultBox.add(sslBox = new CcBox("w/")); + sslMashBox = sslBox.mashBox; + sslRingBox = sslBox.ringBox; + } + if(CS.issl != 1) { + resultBox.add(nonSslBox = new CcBox("non")); + nonSslMashBox = nonSslBox.mashBox; + nonSslRingBox = nonSslBox.ringBox; + } + ui.pack(); + } + void awake() { synchronized(CS.gui) { CS.gui.notify(); } } + void dashboardReset() { + //parms.buttons.setGo(); + parms.buttons.enableStep(true); + } + void closeUI() { + log(5, "closing window"); + CS.isGui = false; CS.isRun = false; CS.go = true; awake(); + } +} +public class CS extends Debug { + volatile static int + connCnt = 0, + forwCnt = 0, + notFinished = 0, + spawned = 0; + volatile static boolean abend = false; + static String text = "bla bla"; + static String clsPath; + static String cePath; + static final String sslPathSuffP = "../CS"; + static Random r; + static int + mn = 0, + mp0 = 11000, + rn = 0, + rp0 = 12000, + pttl = 3, + issl = 0, + connThreshold = 77, // connection retries threshold + connTO = 99, // connection sleep time in msecs + selTO = 999, // selection timeout in msecs + ipace = 0, + pace = 0, + rs = 0, + fake = 0; + static boolean isGui = false, isRun = true, isReset = false, go = true, pacing = false; + static ArrayList constls; + public static Constellation nonSslMashCon, nonSslRingCon, sslMashCon, sslRingCon; + static Gui gui; + static Gui.CBox nonSslMashBox, nonSslRingBox, sslMashBox, sslRingBox; + CS() { + debid = "client server DEMO"; + getArgs(); + } + boolean isArg(String a) { return System.getenv(a) != null; } + int getArgI(String a) throws Exception { + int i = -1; + if(System.getenv(a) != null) + if(!System.getenv(a).equals("")) + try { i = Integer.valueOf(System.getenv(a)); + } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); } + return i; + } + double getArgF(String a) throws Exception { + double d = -1; + if(System.getenv(a) != null) + if(!System.getenv(a).equals("")) + try { d = Double.valueOf(System.getenv(a)); + } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); } + return d; + } + int getMArg(String a) throws Exception { + int i; + if((i = getArgI(a)) == 0) throw new Exception(a + " is mandatory"); + return i; + } + void getArgs() { + try { + clsPath = getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + if(getArgI("DEB") >= 0) debug = getArgI("DEB"); + if(System.getenv("T") != null) text = System.getenv("T"); + if(getArgI("TTL") > 0) pttl = getArgI("TTL"); + if(getArgF("P") >= 0) ipace = (int)(getArgF("P") * 1000); + if(getArgI("MP0") > 0) mp0 = getArgI("MP0"); + if(getArgI("RP0") > 0) rp0 = getArgI("RP0"); + if(getArgI("N") >= 0) { mn = getArgI("N"); rn = mn; } + if(getArgI("SSL") >= 0) issl = getArgI("SSL"); + if(System.getenv("CEP") != null) cePath = System.getenv("CEP"); + else cePath = clsPath + sslPathSuffP; + if(getArgI("MN") >= 0) mn = getArgI("MN"); + if(getArgI("RN") >= 0) rn = getArgI("RN"); + if(getArgF("STO") >= 0) selTO = (int)(getArgF("STO") * 1000); + if(getArgI("FAKE") >= 0) fake = getArgI("FAKE"); + if(isArg("RS")) rs = getArgI("RS"); + if(isArg("G")) isGui = getArgI("G") == 1; + } catch(Exception x) { log(0, x.getMessage()); return; } + } + void dashboard() { + gui = new Gui(this); + log(4, "wait for args from GUI"); + synchronized(gui) { + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + try { gui.wait(); } catch(InterruptedException x) {} + } + if(isGui) cboxes(); + } + void cboxes() { + log(4, "cboxes"); + try { gui.cboxes(); } catch(Exception x) { log(0, x.getMessage()); } + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + nonSslMashBox = gui.nonSslMashBox; + nonSslRingBox = gui.nonSslRingBox; + sslMashBox = gui.sslMashBox; + sslRingBox = gui.sslRingBox; + + } + void constellations() { + pace = ipace; + spawned = 0; notFinished = 0; + constls = new ArrayList(); + if(mn == 1) log(0, "one-node MASH configuration not implemented"); + if(mn > 1) { + if(issl > 0) { sslMashCon = new Constellation(this, sslMashBox, true, true); constls.add(sslMashCon); } + if(issl != 1) { nonSslMashCon = new Constellation(this, nonSslMashBox, true, false); constls.add(nonSslMashCon); } + } + if(rn == 1) log(0, "one-node RING configuration not implemented"); + if(rn > 1) { + if(issl > 0) { sslRingCon = new Constellation(this, sslRingBox, false, true); constls.add(sslRingCon); } + if(issl != 1) { nonSslRingCon = new Constellation(this, nonSslRingBox, false, false); constls.add(nonSslRingCon); } + } + } + void stop() { + if(sslMashCon != null) sslMashCon.stop(); + if(sslRingCon != null) sslRingCon.stop(); + if(nonSslMashCon != null) nonSslMashCon.stop(); + if(nonSslRingCon != null) nonSslRingCon.stop(); + pacing = false; + pace = 0; + go = true; + } + void reset() { + gui.dashboardReset(); + cboxes(); + mp0 += mn; rp0 += rn; // port is unusable 30 secs after port close due to special timeout + constellations(); + isReset = false; + } + String switches(String label) { + return label + ": isGui=" + isGui + ", isRun=" + isRun + ", go=" + go + ", pacing=" + pacing + ", spawned=" + spawned; + } + void pacingGo() { for(Constellation con : constls) con.glob.pacingGo = true; } + void runGuiCon() { + synchronized(constls) { + while(isGui && isRun && spawned > 0) { + log(4, switches("runCons constls.wait")); + try { constls.wait(); } catch(InterruptedException x) {} + pacingGo(); + log(4, switches("runCons constls.paint")); + if(!isGui) break; + try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); } + if(!isGui || !isRun) break; + if(!go) { + log(4, switches("runCons constls.gui.wait")); + synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + } + if(!isGui || !isRun) break; + log(4, switches("runCons constls.notifyAll")); + constls.notifyAll(); + } + } + if(isGui && isRun) { + log(4, switches("runCons last repaint")); + try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); } + if(!go) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + } + else { + log(4, switches("runCons stop")); + stop(); + synchronized(constls) { constls.notifyAll(); } + while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} } + } + } + void runCons() { + if(isGui) pacing = true; + else pacing = false; + for(Constellation con : constls) { notFinished++; new Thread(con, con.label).start(); spawned++; } + if(isGui) runGuiCon(); + else while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} } + } + public void run() { + if(isGui) dashboard(); + if(isRun) { + log(1, "pgm=" + clsPath + getClass().getName() + + ", ttl=" + pttl + ", pace=" + ipace + "msecs, seed=" + rs + ", SSL=" + issl + ", fake=" + fake + ", debug=" + debug); + if(issl > 0) log(3, "cePath=" + cePath); + constellations(); + if(!isGui) { r = new Random(rs); runCons(); } + else while(isGui) { + r = new Random(rs); + runCons(); + log(1, "all constellations finished"); + if(!isGui) break; + gui.parms.buttons.setGo(); + if(!go) gui.parms.buttons.step.setEnabled(false); + if(abend) { + gui.parms.buttons.go.setEnabled(false); + gui.parms.buttons.step.setEnabled(false); + gui.parms.buttons.reset.setEnabled(false); + } + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + if(!isReset) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + if(isGui) { + reset(); + do synchronized(gui) { + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + if(!isRun) try { gui.wait(); } catch(InterruptedException x) {} + if(isGui && isReset) reset(); + } while(isGui && !isRun); + } + } + } + log(1, "final balance, connections=" + connCnt + ", forwards=" + forwCnt); + if(gui != null) gui.ui.dispose(); + log(2, "run end"); + } + public static void main(String[] args) throws Exception { + CS cs = new CS(); + cs.run(); + cs.log(1, "cs end"); + //try { cs.run(); } catch(Exception x) { cs.log(0, "interrupted execution"); }; + } +} +// rozeznání konce ve step-módu +// pacing slide +// input fields +// ukládání parametrů +// too many open files +// exceptions +// stavové zprávy na dashboardu \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f CSj/Makefile --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSj/Makefile Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,2 @@ +all: CS.java + javac CS.java diff -r 000000000000 -r 5c129dd80d4f CSp/CS.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSp/CS.py Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,315 @@ +#!/usr/bin/python3 +# coding=utf8 + +# ring nonssl: cca 8000 rings/sec*3nodes +# ring ssl: cca 2600 rings/sec*3nodes +# mash nonssl: cca 2000 mashes/sec*3nodes +# mash ssl: cca 300 mashes/sec*3nodes +# u mashe je čas na přenosy úměrný počtu uzlů, +# kdežto čas na connect/close je úměrný počtu spojů tj. kvadrátu počtu uzlů + +import time +import socket +import errno +import os +import signal +import sys +import pickle +import ssl +import select +import random +import multiprocessing +import threading + + +class Debug(): + def __init__(self, debid): + self.debid = debid + def log(self, level, *msg): + if level <= maxDebLev: + log_lock.acquire() + print("{:10.6f} {}:".format(time.time()-t0, self.debid), *msg, file=sys.stderr) + sys.stderr.flush() + log_lock.release() + def abend(self, s): + self.log(0, s) + os.kill(0, signal.SIGTERM) + + +class Node(Debug): + def __init__(self, debid, forwarding, topo, port, p0, pn, issl): + Debug.__init__(self, "{} node {}".format(debid, port)) + self.topo = topo + self.locPort = port + self.p0 = p0 + self.pn = pn + self.ssc = None + self.cli_side = {} + self.srv_side = {} + self.kicker = (self.locPort == self.pn) + self.forwarding = forwarding # in forwarding kicker task indicates when TTL reached 0 + self.closing = False + self.payload = None + self.issl = issl + if issl: + self.log(4, "setting SSL context...") + self.sslCert = cePath + "/certs/{}.pem".format(port) + self.sslKey = cePath + "/keys/{}.key".format(port) + try: + self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + self.ctx.verify_mode = ssl.CERT_REQUIRED + self.log(5, "cert={}, key={}".format(self.sslCert, self.sslKey)) + self.ctx.load_cert_chain(self.sslCert, self.sslKey) + self.ctx.load_verify_locations(None, caPath) + except ssl.SSLError as e: self.abend("SSL context: {}".format(str(e))) + def run(self): + self.bind() + self.payload = Data(self.debid, self.locPort) + if self.kicker: + nxt = self.next_node() + self.log(1, "kicker ready to send '{}' to {}".format(self.payload.digest(), nxt)) + if not nxt in self.cli_side: self.conn(nxt) + self.payload.put(self.cli_side[nxt]) + if self.forwarding[0].value: wait_list = (self.ssc,) + while wait_list: + self.log(4, "select...") + self.log(5, "waitlist:", *(sc.fileno() for sc in wait_list)) + ready_list = select.select(wait_list, (), (), sel_TO) + self.log(5, "readylist:", *(sc.fileno() for sc in ready_list[0])) + for sc in ready_list[0]: + self.log(5, "sc {} ready...".format(sc.fileno())) + if sc == self.ssc: sc = self.acc() + self.forward(sc) + wait_list = () + self.log(4, "forwarding={}".format(self.forwarding[0].value)) + if self.forwarding[0].value: wait_list += (self.ssc,) # when off, no new connection will come on ssc + else: self.close_cli() + for sc in self.srv_side.values(): wait_list += (sc,) + self.close_srv() + signal.signal(signal.SIGUSR2, sighand) + last = 0 + ctr_lock.acquire() + active.value -= 1 + if active.value == 0: last = 1 + ctr_lock.release() + if last: os.kill(0, signal.SIGUSR2) + else: signal.pause() + self.log(2, "ended") + os._exit(0) + def forward(self, sc): + if not self.payload.get(sc): + self.log(5, "delete srv_side[{}]".format(sc.fileno())) + del self.srv_side[sc] + self.log(4, "closing {}...".format(sc.fileno())) + sc.close() + else: + self.log(5, "received data from {}".format(self.payload.rport)) + ctr_lock.acquire(); + forwards.value += 1; + ctr_lock.release() + if self.kicker: + self.payload.dttl() + self.log(3, "received from {}: {}, ttl={}".format(self.topo, self.payload.digest(), self.payload.ttl)) + if self.payload.ttl <= 0: + self.log(1, "received after passing all {}: {}".\ + format("mashes" if self.topo == "mash" else "rings", self.payload.digest())) + self.forwarding[0].value = 0 + return + nxt = self.next_node() + self.log(5, "forwarding to {}...".format(nxt)) + if pacing: time.sleep(pace) + if not nxt in self.cli_side: self.conn(nxt) + self.payload.put(self.cli_side[nxt]) + self.log(5, "forwarded to {}".format(nxt)) + def next_node(self): + if self.topo == "ring": + if self.kicker: nxt = self.p0 + else: nxt = self.locPort + 1 + else: + nxt = self.locPort + while nxt == self.locPort: + nxt = random.randint(self.p0, self.pn) + return nxt + def bind(self): + self.log(4, "binding...") + try: + self.ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except Exception as e: + self.abend("ssc alloc: {}".format(e.strerror)) + if self.issl: + self.log(4, "ssc SSL wrap") + try: self.ssc = self.ctx.wrap_socket(self.ssc) + except ssl.SSLError as e: self.abend("ssc SSL wrap: {}".format(str(e))) + try: + self.ssc.bind(("127.0.0.1", self.locPort)) + self.ssc.listen(1) + except Exception as e: self.abend("bind: {}".format(e.strerror)) + self.log(2, "bound") + def conn(self, remPort): + self.log(4, "connecting to {}...".format(remPort)) + try: sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + except Exception as e: self.abend("socket alloc: {}".format(e.strerror)) + if self.issl: + self.log(4, "sc SSL wrap") + try: sc = self.ctx.wrap_socket(sc) + except ssl.SSLError as e: self.abend("sc SSL wrap: {}".format(str(e))) + retry = connThreshold + connected = False + while not connected and retry > 0: + try: + sc.connect(("127.0.0.1", remPort)) + connected = True + except Exception as e: + if e.errno == errno.ECONNREFUSED: + retry = retry - 1 + time.sleep(conn_TO) + else: self.abend("connect: {}".format(str(e))) + if retry == 0: self.abend("connection refused, threshold {} reached".format(connThreshold)) + ctr_lock.acquire(); + connects.value += 1; + ctr_lock.release() + try: self.cli_side[remPort] = sc.makefile("wb") + except Exception as e: self.abend("client side makefile: {}".format(str(e))) + self.log(2, "connected to {} after {} retries".format(remPort, connThreshold - retry)) + def acc(self): + self.log(4, "accepting...") + try: + ac = self.ssc.accept() + sc = ac[0] + except Exception as e: self.abend("accept: {}".format(str(e))) + try: sc = sc.makefile("rb") + except Exception as e: self.abend("srv side makefile: {}".format(str(e))) + self.srv_side[sc] = sc + self.log(2, "accepted on sc={}, addr={}".format(sc.fileno(), ac[1])) + return sc + def close_srv(self): + self.log(5, "closing ssc...") + if self.ssc: self.ssc.close() + def close_cli(self): + def do_close(): + self.log(5, "closing clients...") + scs = self.cli_side.values() + self.cli_side.clear() + for sc in scs: sc.close() + self.log(4, "all clients closed") + if not self.closing: + threading.Thread(target = do_close, name = "client {} close".format(self.locPort)).start() + self.closing = True + + +class Data(Debug): + def __init__(self, debid, port): + self.debid = debid + " payload" + self.clear() + self.ttl = ittl + self.lport = port + self.rport = 0 + self.text = "" + def clear(self): + (self.ttl, self.rport, self.text) = 3 * (None,) + def put(self, sc): + self.log(5, "sending via {}...".format(sc.fileno())) + if self.ttl == None: self.ttl = ittl + if self.text == None: self.text = itext + try: + pickle.dump((self.ttl, self.lport, self.text), sc) + sc.flush() + except Exception as e: self.abend("send: {}".format(str(e))) + def get(self, sc): + self.log(5, "reading from {}...".format(sc.fileno())) + self.clear() + try: + (self.ttl, self.rport, self.text) = pickle.load(sc) + return True + except Exception as e: + if isinstance(e, EOFError): return False + else: self.abend("receive: {}".format(str(e))) + def dttl(self): + self.ttl -= 1 + return self.ttl + def digest(self): + return self.text if len(self.text) < 24 else self.text[0:8]+"--------"+self.text[-8:] + def toString(self): + return "ttl={}, from port={}, text={}".\ + format(str(self.ttl), str(self.rport), self.digest()) + +class Constellation(Debug): + def __init__(self, issl, topo, p0, n): + Debug.__init__(self, "{}SSL {}".format("" if issl else "non", topo.upper())) + def run(self, issl, topo, p0, n): + signal.signal(signal.SIGUSR2, signal.SIG_IGN) + forwarding = [multiprocessing.Value('i', 1, lock=False)] # list is passed by reference + if n == 1: + self.log(0, "one-node configuration is not implemented") + else: + self.log(1, "{} nodes starting...".format(n)) + p0 += 500 if issl else 0 + pn = p0 + n - 1 + for port in range(p0, p0 + n): + pid = os.fork() + if not pid: Node(self.debid, forwarding, topo, port, p0, pn, issl).run() + else: self.log(4, "node {} started in process {}".format(port, pid)) + self.log(2, "all nodes established") + while 1: + try: + p = os.wait() + if p[1] & 255: + self.log(4, "pid {} killed by {}".format(p[0], p[1] & 255)) + else: + self.log(4, "pid {} returned {}".format(p[0], p[1] >> 8)) + except: break + os._exit(0) + +def ga(key, default): + return os.environ[key] if key in os.environ else default +def gi(key, default): + return int(ga(key, default)) +def sighand(signal, frame): + pass + +debug = Debug("client/server demo") +log_lock = multiprocessing.Lock() +ctr_lock = multiprocessing.Lock() +forwards = multiprocessing.Value('i', 0) +connects = multiprocessing.Value('i', 0) +active = multiprocessing.Value('i', 0) +t0 = time.time() +maxDebLev = gi('DEB', 0) +mn = rn = gi('N', 0) +rp0 = gi('RP0', 11000) +rn = gi('RN', 3) +mp0 = gi('MP0', 12000) +mn = gi('MN', 3) +itext = ga('T', "bla bla") +ittl = gi('TTL', 3) +pace = float(os.environ["P"]) if "P" in os.environ else 0 +pacing = 1 if pace > 0 else 0 +random.seed(gi('RS', 0)) +connThreshold = 77 +conn_TO = 0.01 +sel_TO = 1 +issl = gi('SSL', 0) +caPath = "/home/local/etc/ssl/certs/" +sslPathSuff = "/../CS/" +cePath = os.environ["CEP"] if "CEP" in os.environ else os.path.dirname(sys.argv[0]) + sslPathSuff +active.value = mn + rn +if issl > 1: active.value *= 2 +signal.signal(signal.SIGUSR2, signal.SIG_IGN) +debug.log(1, "pgm={}, ttl={}, pace={}, seed={}, SSL mask={}, debug={}".format(sys.argv[0], ittl, pace, gi('RS', 0), issl, maxDebLev)) +if issl > 0: debug.log(3, "ssl path: {}, CA path: {}".format(cePath, caPath)) + +if issl < 2: issl = (issl,) +else: issl = (0, 1) +if ittl > 0: + for ss in issl: + topo = ("mash", "ring") + p0 = (mp0, rp0) + n = (mn, rn) + for p in zip(topo, p0, n): + if not os.fork(): + Constellation(ss, *p).run(ss, *p) + while 1: + try: os.wait() + except: break +debug.log(1, "final balance: forwards={}, connections={}".format(forwards.value, connects.value)) diff -r 000000000000 -r 5c129dd80d4f cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cs Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,75 @@ +#!/bin/bash + +DEBID="client/server DEMO starting script" +D=$(dirname $0) + +[[ $(basename $0) == cs ]] && { + echo $DEBID: + echo "Syntax: $(dirname $0)/{csa,csc,cscpp,csj,csp}" + echo "env:" + echo -e "\tDEB\tdebug level 0-9; 7=msgs of level 7 only; 9=asm komponent sandbox" + echo -e "\tRS\trandom seed" + echo -e "\tP\tpacing, secs in float" + echo -e "\tT\ttext sent" + echo -e "\tMP0\tlistening port of the first node of MASH constellation" + echo -e "\tPP0\tlistening port of the first node of RING constellation" + echo -e "\tMN\tnum. of MASH nodes" + echo -e "\tRN\tnum. of RING nodes" + echo -e "\tN\tnum. of MASH and RING nodes; overchanged by MN/RN" + echo -e "\tTTL" + echo -e "\tSSL\t0=no SSL, 1=SSL, 2=both" + echo -e "\tCAP\tpath to SSL CA certs" + echo -e "\tCEP\tpath to local SSL stuff (keys and certificates)" + exit 1 + } + +# explicit values for this run +_MP0=$MP0 +_RP0=$RP0 +_MN=$MN +_RN=$RN + +[[ $_MP0 ]] && MP0=$_MP0 +[[ $_RP0 ]] && RP0=$_RP0 + +[[ $_MN ]] && MN=$_MN +[[ $_RN ]] && RN=$_RN + +[[ $N ]] && { MN=$N; RN=$N; } + +[[ -z $CEP ]] && CEP=$D/CS/ # SSL certs path +[[ -z $CAP ]] && CAP="/home/local/etc/ssl/certs/" # CA certs ppath +[[ -z $RS ]] && RS=$RANDOM + +p= +[[ $DEB ]] && p+="DEB=$DEB" +[[ $RN ]] && p+=" RN=$RN" +[[ $RP0 ]] && p+=" RP0=$RP0" +[[ $MN ]] && p+=" MN=$MN" +[[ $MP0 ]] && p+=" MP0=$MP0" +[[ $N ]] && p+=" N=$N" +[[ $T ]] && p+=' T="$T"' +[[ $SSL ]] && p+=" SSL=$SSL" +[[ $TTL ]] && p+=" TTL=$TTL" +[[ $FAKE ]] && p+=" FAKE=$FAKE" +[[ $STO ]] && p+=" STO=$STO" +[[ $CAP ]] && p+=" CAP=$CAP" +[[ $CEP ]] && p+=" CEP=$CEP" +[[ $P ]] && p+=" P=$P" +[[ $RS ]] && p+=" RS=$RS" + +trap "echo $DEBID: signal USR1 caught" USR1 +trap "" USR2 +trap "echo $DEBID: ABEND" TERM +case $(basename $0) in + csp ) eval $p $D/CSp/CS.py ;; +# csj ) eval $p java -cp $D/CSj cs.CS ;; + csj ) eval $p java -cp $D/CSj CS ;; + cscpp ) eval $p $D/CScpp/CS ;; + csc ) eval $p $D/CSc/CS ;; + csa ) eval $p $D/CSa32/CS ;; + * ) ;; +esac +RC=$? +[[ $RS && $DEB -gt 0 ]] && echo RS: $RS, RC: $RC +exit $RC diff -r 000000000000 -r 5c129dd80d4f csa --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/csa Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f csc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/csc Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f cscpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cscpp Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f csj --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/csj Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f csp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/csp Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f csr --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/csr Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f cst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/cst Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1 @@ +cs \ No newline at end of file diff -r 000000000000 -r 5c129dd80d4f meter --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/meter Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,59 @@ +#/bin/bash + +trap "" USR2 # USR2 is used to synchronize subprocesses + +home=$(dirname $0) + +save() { + cat >$fn <skok)) && { ((n+=500)); ((skok+=1000)); } + echo n=$n + echo skok=$skok +} + +trap exi 2 +fn=$(basename $0).port +[[ -e $fn ]] || save 11000 $(date +%s) +. $fn +D=$(date +%s)-$TS +((RP0>29999)) || ((D>60)) && { RP0=11000; TS=$(date +%s); save $RP0 $TS; } +declare -a langs +langs=(Asm C C++ Java Python) +declare -A cmds times +cmds=([Asm]="csa" [C]="csc" ["C++"]="cscpp" [Java]="csj" [Python]="csp") + +[[ $RUNS ]] || export RUNS=3 +[[ $DEB ]] || export DEB=0 +[[ $MN ]] || export MN=0 +[[ $RN ]] || export RN=33 # pro python radši ne víc než 50 +[[ $TTL ]] || export TTL=3333 +[[ $SSL ]] || export SSL=0 +echo -e DEB=$DEB\\tMN=$MN\\tRN=$RN\\tTTL=$TTL\\tSSL=$SSL + +n=0; skok=500; port=$RP0 +for j in $(seq $RUNS) + do + for k in "${langs[@]}" + do + export RP0=$((port+n)) + times[$k]=$(/usr/bin/time -o /dev/stdout -f %U $home/${cmds[$k]}) + printf "%-12s%4.2f\n" $k ${times[$k]} + eval $(posun) + done + echo +done +save $((port+n)) $(date +%s)