--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CS/CS.cfg Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,335 @@
+#
+# OpenSSL example configuration file.
+# This is mostly being used for generation of certificate requests.
+#
+
+# This definition stops the following lines choking if HOME isn't
+# defined.
+HOME = .
+RANDFILE = $ENV::HOME/.rnd
+
+# Extra OBJECT IDENTIFIER info:
+#oid_file = $ENV::HOME/.oid
+oid_section = new_oids
+
+# To use this configuration file with the "-extfile" option of the
+# "openssl x509" utility, name here the section containing the
+# X.509v3 extensions to use:
+# extensions =
+# (Alternatively, use a configuration file that has only
+# X.509v3 extensions in its main [= default] section.)
+
+[ new_oids ]
+
+# We can add new OIDs in here for use by 'ca' and 'req'.
+# Add a simple OID like this:
+# testoid1=1.2.3.4
+# Or use config file substitution like this:
+# testoid2=${testoid1}.5.6
+
+####################################################################
+[ ca ]
+default_ca = CA_default # The default ca section
+
+####################################################################
+[ CA_default ]
+
+dir = /home/local/etc/ssl # Where everything is kept
+certs = $dir/certs # Where the issued certs are kept
+crl_dir = $dir/crl # Where the issued crl are kept
+database = $dir/index.txt # database index file.
+#unique_subject = no # Set to 'no' to allow creation of
+ # several ctificates with same subject.
+new_certs_dir = $dir/newcerts # default place for new certs.
+
+certificate = $dir/certs/hh_ca.crt # The CA certificate
+serial = $dir/serial # The current serial number
+crlnumber = $dir/crlnumber # the current crl number
+ # must be commented out to leave a V1 CRL
+crl = $dir/crl.pem # The current CRL
+private_key = $dir/private/hh_ca.key # The private key
+RANDFILE = $dir/private/.rand # private random number file
+
+x509_extensions = usr_cert # The extentions to add to the cert
+
+# Comment out the following two lines for the "traditional"
+# (and highly broken) format.
+name_opt = ca_default # Subject Name options
+cert_opt = ca_default # Certificate field options
+
+# Extension copying option: use with caution.
+# copy_extensions = copy
+
+# Extensions to add to a CRL. Note: Netscape communicator chokes on V2 CRLs
+# so this is commented out by default to leave a V1 CRL.
+# crlnumber must also be commented out to leave a V1 CRL.
+crl_extensions = crl_ext
+
+default_days = 365 # how long to certify for
+default_crl_days= 30 # how long before next CRL
+default_md = sha1 # which md to use.
+preserve = no # keep passed DN ordering
+
+# A few difference way of specifying how similar the request should look
+# For type CA, the listed attributes must be the same, and the optional
+# and supplied fields are just that :-)
+policy = policy_anything
+
+####################################################################
+# For the CA policy
+[ policy_match ]
+countryName = match
+stateOrProvinceName = match
+organizationName = match
+organizationalUnitName = optional
+commonName = supplied
+emailAddress = optional
+
+####################################################################
+# For the 'anything' policy
+# At this point in time, you must list all acceptable 'object'
+# types.
+[ policy_anything ]
+countryName = optional
+stateOrProvinceName = optional
+localityName = optional
+organizationName = optional
+organizationalUnitName = optional
+commonName = supplied
+emailAddress = optional
+
+####################################################################
+[ req ]
+default_bits = 2048
+default_keyfile = privkey.pem
+distinguished_name = req_distinguished_name
+attributes = req_attributes
+x509_extensions = v3_ca # The extentions to add to the self signed cert
+
+# Passwords for private keys if not present they will be prompted for
+# input_password = secret
+# output_password = secret
+
+# This sets a mask for permitted string types. There are several options.
+# default: PrintableString, T61String, BMPString.
+# pkix : PrintableString, BMPString.
+# utf8only: only UTF8Strings.
+# nombstr : PrintableString, T61String (no BMPStrings or UTF8Strings).
+# MASK:XXXX a literal mask value.
+# WARNING: current versions of Netscape crash on BMPStrings or UTF8Strings
+# so use this option with caution!
+string_mask = nombstr
+prompt = no
+
+####################################################################
+# req_extensions = v3_req # The extensions to add to a certificate request
+[ req_distinguished_name ]
+countryName = CZ
+stateOrProvinceName = --
+localityName = Praha
+0.organizationName = H.H.
+organizationalUnitName = --
+commonName = $ENV::CN
+emailAddress = hh@hh.cz
+
+# SET-ex3 = SET extension number 3
+
+[ req_attributes ]
+#challengePassword = A challenge password
+#challengePassword_min = 4
+#challengePassword_max = 20
+
+#unstructuredName = An optional company name
+
+####################################################################
+[ usr_cert ]
+# These extensions are added when 'ca' signs a request.
+
+# This goes against PKIX guidelines but some CAs do it and some software
+# requires this to avoid interpreting an end user certificate as a CA.
+basicConstraints=CA:FALSE
+
+# Here are some examples of the usage of nsCertType. If it is omitted
+# the certificate can be used for anything *except* object signing.
+# This is OK for an SSL server.
+# nsCertType = server
+# For an object signing certificate this would be used.
+# nsCertType = objsign
+# For normal client use this is typical
+# nsCertType = client, email
+# and for everything including object signing:
+# nsCertType = client, email, objsign
+
+# This is typical in keyUsage for a client certificate.
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+
+# This will be displayed in Netscape's comment listbox.
+nsComment = "hh_ca - OpenSSL Generated Certificate"
+
+# PKIX recommendations harmless if included in all certificates.
+subjectKeyIdentifier=hash
+#authorityKeyIdentifier=keyid:always,issuer:always
+
+# This stuff is for subjectAltName and issuerAltname.
+# Import the email address.
+subjectAltName=email:copy
+# An alternative to produce certificates that aren't
+# deprecated according to PKIX.
+# subjectAltName=email:move
+
+# Copy subject details
+#issuerAltName=issuer:copy
+
+nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsBaseUrl
+nsRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsRenewalUrl
+#nsCaPolicyUrl
+#nsSslServerName
+
+####################################################################
+[ srv_cert ]
+# These extensions are added when 'ca' signs a request.
+
+# This goes against PKIX guidelines but some CAs do it and some software
+# requires this to avoid interpreting an end user certificate as a CA.
+basicConstraints=CA:FALSE
+
+# Here are some examples of the usage of nsCertType. If it is omitted
+# the certificate can be used for anything *except* object signing.
+# This is OK for an SSL server.
+nsCertType = server
+# For an object signing certificate this would be used.
+# nsCertType = objsign
+# For normal client use this is typical
+# nsCertType = client, email
+# and for everything including object signing:
+# nsCertType = client, email, objsign
+
+# This is typical in keyUsage for a client certificate.
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+
+# This will be displayed in Netscape's comment listbox.
+nsComment = "hh_ca - OpenSSL Generated Certificate"
+
+# PKIX recommendations harmless if included in all certificates.
+subjectKeyIdentifier=hash
+#authorityKeyIdentifier=keyid:always,issuer:always
+
+# This stuff is for subjectAltName and issuerAltname.
+# Import the email address.
+subjectAltName=email:copy
+# An alternative to produce certificates that aren't
+# deprecated according to PKIX.
+# subjectAltName=email:move
+
+# Copy subject details
+#issuerAltName=issuer:copy
+
+nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsBaseUrl
+nsRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsRenewalUrl
+#nsCaPolicyUrl
+#nsSslServerName
+
+####################################################################
+[ v3_req ]
+# Extensions to add to a certificate request
+
+basicConstraints = CA:FALSE
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+
+####################################################################
+[ v3_ca ]
+# Extensions for a typical CA
+
+# PKIX recommendation.
+subjectKeyIdentifier=hash
+#authorityKeyIdentifier=keyid:always,issuer:always
+
+# This is what PKIX recommends but some broken software chokes on critical
+# extensions.
+#basicConstraints = critical,CA:true
+# So we do this instead.
+basicConstraints = CA:true
+
+# Key usage: this is typical for a CA certificate. However since it will
+# prevent it being used as an test self-signed certificate it is best
+# left out by default.
+# keyUsage = cRLSign, keyCertSign
+
+# Some might want this also
+nsCertType = sslCA, emailCA
+
+# Include email address in subject alt name: another PKIX recommendation
+subjectAltName=email:copy
+# Copy issuer details
+#issuerAltName=issuer:copy
+
+# DER hex encoding of an extension: beware experts only!
+# obj=DER:02:03
+# Where 'obj' is a standard or added object
+# You can even override a supported extension:
+# basicConstraints= critical, DER:30:03:01:01:FF
+
+####################################################################
+[ crl_ext ]
+# CRL extensions.
+# Only issuerAltName and authorityKeyIdentifier make any sense in a CRL.
+
+# issuerAltName=issuer:copy
+authorityKeyIdentifier=keyid:always,issuer:always
+
+####################################################################
+[ proxy_cert_ext ]
+# These extensions should be added when creating a proxy certificate
+
+# This goes against PKIX guidelines but some CAs do it and some software
+# requires this to avoid interpreting an end user certificate as a CA.
+basicConstraints=CA:FALSE
+
+# Here are some examples of the usage of nsCertType. If it is omitted
+# the certificate can be used for anything *except* object signing.
+
+# This is OK for an SSL server.
+# nsCertType = server
+
+# For an object signing certificate this would be used.
+# nsCertType = objsign
+
+# For normal client use this is typical
+# nsCertType = client, email
+
+# and for everything including object signing:
+# nsCertType = client, email, objsign
+
+# This is typical in keyUsage for a client certificate.
+# keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+
+# This will be displayed in Netscape's comment listbox.
+nsComment = "hh_ca - OpenSSL Generated Certificate"
+
+# PKIX recommendations harmless if included in all certificates.
+subjectKeyIdentifier=hash
+authorityKeyIdentifier=keyid,issuer:always
+
+# This stuff is for subjectAltName and issuerAltname.
+# Import the email address.
+subjectAltName=email:copy
+# An alternative to produce certificates that aren't
+# deprecated according to PKIX.
+# subjectAltName=email:move
+
+# Copy subject details
+issuerAltName=issuer:copy
+
+nsCaRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsBaseUrl
+nsRevocationUrl = http://www.hh.cz/ca-crl.pem
+#nsRenewalUrl
+#nsCaPolicyUrl
+#nsSslServerName
+
+# This really needs to be in place for it to be a proxy certificate.
+proxyCertInfo=critical,language:id-ppl-anyLanguage,pathlen:3,policy:foo
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CS/certs.sh Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,7 @@
+#!/bin/bash
+read -sp "CA master pw: " PW
+for i in `seq 11500 11599`; do
+ for j in 0 1000; do
+ echo "$PW" | CN=$i openssl ca -config CS.cfg -in req/$((i+j)).req -out certs/$((i+j)).pem -batch -passin stdin || break
+ done
+done
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CS/keys_reqs.sh Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,7 @@
+#!/bin/bash
+for i in `seq 11600 11699`; do
+ for j in 0 1000 2000; do
+ # PROMPT=no DN=auto CN=$((i+j)) openssl req -config /home/local/etc/ssl/hh_ca.cfg -new -out req/$((i+j)).req -keyout keys/$((i+j)).key -nodes
+ CN=$((i+j)) openssl req -config /home/local/etc/ssl/hh_ca.cfg -new -out req/$((i+j)).req -keyout keys/$((i+j)).key -nodes
+ done
+done
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/CS.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,556 @@
+ .include "DS.S"
+#-----------------------------------------------
+# S T A T I C V A R I A B L E S
+#-----------------------------------------------
+ .data
+ .global C.csP
+C.csP: .int 0 # -->CS
+ .text
+#-----------------------------------------------
+# M A I N R O U T I N E
+#-----------------------------------------------
+# takes values from ENV, establishes task for comm nodes and waits for their completion
+ ARGS
+ DS prgP # -->progname
+# returns nothing
+ PROLOC
+ DL this, CSL # CS - top level attr vector
+ DL deP # -->Debug
+ DL status # status returned from wait
+ DL pid # PID returned from wait
+ DL bad # cummulative subtask rc
+ DL s, 256 # string buf
+ EPILOC
+#-----------------------------------------------
+ .global _start
+_start:
+ PROLOG
+ lea this(bp), b # -->CS
+ mov b, C.csP # save -->CS
+ lea C.debug(b), a # -->Debug
+ mov a, deP(bp)
+# initialize static debug vars
+ push prgP(bp) # argv[0] -->prgname
+ call D.init # initialize static debug vars
+ push D.prgNameP
+# indetify itself
+ DEBID "client/server demo"
+ movl $1, C.debMaxLev(b) # default debug level
+# initialize top level ctrl values vector
+ ShareA # allocate shared structure
+ mov C.csP, b
+ mov a, C.shP(b) # save -->shared struct in vector
+ movl $0, S.msgs(a) # init msg cntr
+ movl $0, S.conns(a) # init connection cntr
+ push $1 # initial semaphore value
+ push $1 # semaphore shared between processes
+ lea S.counter_sem(a), c
+ push c
+# call sem_init # initialize shared counters semaphore
+# cmp $0, a
+# jz 0f
+# SYSERR "sem_init of shared counters semaphore"
+#0:
+ SYS sem_init
+# set default values
+ mov C.csP, b
+ movl $3, C.ttl(b) # default TTL
+ movl $11000, C.rp0(b) # default bind port for the 1. node in ring
+ movl $12000, C.mp0(b) # default bind port for the 1. node in mash
+ movl $0, C.rn(b) # default # of nodes in ring
+ movl $0, C.mn(b) # default # of nodes in mash
+ movl $0, C.ssl(b) # default ssl switch - 0=noSSL
+ movl $77, C.connTh(b) # conn retries threshold
+ mov $0f, a
+ mov a, C.txtP(b) # default payload text
+ jmp 1f
+0: .ascii "bla bla\0"
+1:
+# get control values from ENV
+# get max debug level
+ GETINTENV "DEB"
+ mov C.csP, b
+ mov a, C.debMaxLev(b)
+ cmp $-1, a # debug level -1 means search subroutines
+ jne 0f
+ call D.subr
+ jmp C.ret
+0:
+# get message payload text
+ push $0f
+ call getenv
+ cmp $0, a
+ jz 1f
+ mov C.csP, b
+ mov a, C.txtP(b)
+ jmp 1f
+0: .asciz "T"
+1:
+# get TTL
+ GETINTENV "TTL"
+ jz 0f
+ mov a, C.ttl(b)
+0:
+# get # of nodes in each topology
+ GETINTENV "I"
+ jz 0f
+ mov a, C.rn(b)
+ mov a, C.mn(b)
+0:
+# get # of nodes in ring
+ GETINTENV "RN"
+ jz 0f
+ mov a, C.rn(b)
+0:
+# get # of nodes in mash
+ GETINTENV "MN"
+ jz 0f
+ mov a, C.mn(b)
+0:
+ add C.rn(b), a
+ cmp $3, C.ssl(b)
+ jne 1f
+ add a, a # double # of nodes when both SSL and nonSSL
+1: mov C.shP(b), c # -->shared counters
+ mov a, S.act(c) # save # of active nodes
+# get first ring node's bind port#
+ GETINTENV "RP0"
+ jz 0f
+ mov a, C.rp0(b)
+0:
+# get first mash node's bind port#
+ GETINTENV "MP0"
+ jz 0f
+ mov a, C.mp0(b)
+0:
+# get random() seed
+ GETINTENV "RS"
+ jz 0f
+ push a
+ call srandom
+0:
+# get pacing interval (real num in seconds)
+ movl $0, C.pace.tv_sec(b)
+ movl $0, C.pace.tv_nsec(b)
+ movl $0, C.pacing(b)
+ push $0f
+ call getenv
+ jmp 1f
+0: .asciz "P"
+1:
+ test a, a # env P set ?
+ jz 3f # no
+ push a
+ call atof # convert to double
+ fstl (sp) # tempor save
+ mov C.csP, b
+ fisttpl C.pace.tv_sec(b) # truncated integral part = secs
+ fldl (sp)
+ fisubl C.pace.tv_sec(b) # fraction part
+ fimull 1f # * 10^9 = nanosecs
+ fistl C.pace.tv_nsec(b)
+ jmp 2f
+0: .double 0
+1: .int 1000000000 # 10^9
+2:
+ cmp $0, C.pace.tv_sec(b)
+ jnz 0f
+ cmp $0, C.pace.tv_nsec(b)
+ jz 3f
+0: movl $1, C.pacing(b)
+3:
+# get ssl switch value and save it as mask
+# switch: 0 = noSSL, 1 = SSL, 2 = both
+# mask: 01B=noSSL, 10B=SSL, 11B=both
+ GETINTENV "SSL" # returned zero means SSL=0 or SSL by default 0
+ inc a # change switch to mask
+ mov a, C.ssl(b)
+ cmp $1, a
+ je C.cont # no SSL
+ cmp $2, a
+ je 0f # only SSL
+ mov C.shP(b), a # -->shared mem
+ shll $1, S.act(a) # double # of active nodes when running both SSL and nonSSL
+# get SSL CA cert dir path
+0:
+ push $0f
+ call getenv
+ jmp 1f
+0: .asciz "CAP"
+1:
+ test a, a
+ jnz 1f
+ mov $0f, a
+ jmp 1f
+0: .asciz "/home/local/etc/ssl/certs/"
+1:
+ mov C.csP, b
+ mov a, C.caPathP(b) # save -->SSL CA certs path
+# get SSL dir path
+ push $0f
+ call getenv
+ jmp 1f
+0: .asciz "CEP"
+1:
+ test a, a
+ jz 0f # ceP not set, try to determine
+ mov C.csP, b
+ mov a, C.cePathP(b) # save -->SSL dir path
+ jmp 1f
+# determine home path (needed to locate SSL keys & certificates)
+0:
+ push prgP(bp) # -->first parm - progname w/ path
+ call dirname # get prog dirname
+ push a
+ call strlen # dirname length
+ lea 1f-0f(a), a # + suffix length
+ sub a, sp # allocate space for cePath
+ mov C.csP, b
+ mov sp, C.cePathP(b) # save -->SSL path
+ push prgP(bp)
+ call dirname # get home dirname
+ push a # -->home dirname
+ mov C.csP, b
+ push C.cePathP(b) # -->SSL path
+ call strcpy # copy dirname to SSL path
+ call strlen # end of dirname
+ push $0f # -->suffix
+ mov C.csP, b
+ mov C.cePathP(b), c
+ lea (c, a), a # -->end of dirname
+ push a
+ call strcpy # copy suffix to SSL path
+ jmp 1f
+0: .asciz "/../CS/"
+1:
+# testing sandbox
+ mov C.csP, b
+ pushl C.debMaxLev(b)
+ cmp $9, C.debMaxLev(b)
+ jne C.cont
+
+ LOG 9, "debug=%u, testing...", 1
+ DebugA
+ mov a, deP(bp)
+ DEBID "TEST"
+ LOG 9, "progress"
+
+ mov C.csP, b
+ mov C.shP(b), a
+ push S.act(a)
+ LOG 9, "shared act=%u"
+
+ jmp C.ret
+# normal execution
+C.cont:
+ push C.debMaxLev(b)
+ push C.ssl(b)
+ push C.pace.tv_nsec(b)
+ push C.pace.tv_sec(b)
+ push C.ttl(b)
+ push C.rn(b)
+ push C.mn(b)
+ push D.prgNameP
+ LOG 1, "pgm=%s, mash nodes=%d, ring nodes=%d, ttl=%d, pacing=%ld.%09ld, SSL mask=0x%02x, debug=%u", 8
+ testl $2, C.ssl(b)
+ jz 0f
+ push C.caPathP(b)
+ push C.cePathP(b)
+ LOG 1, "SSL path=%s, SSL CA path=%s"
+0:
+# create constellation processes RING/MASH, nonSSL/SSL
+ mov $0, c # iter counter
+ mov C.ssl(b), d # SSL mask (01b = noSSL, 10b = SSL, 11b = both)
+# iterate on SSL variants
+C.iterateOnSsl:
+ test $1, d # check lowest bit of mask
+ jz C.nextSsslVar # next SSL variant
+ pusha
+# create RING
+ SYS fork
+ jnz 1f # parent
+ popa
+ pushl $Cn.ring # RING topology
+ push c # use iter ctr as SSL switch
+ call Constellation # create RING constellation
+1: push a
+ LOG 5, "RING started in process %d"
+ lea 4(sp), sp
+# create MASH
+ SYS fork
+ jnz 1f # parent
+ popa
+ pushl $Cn.mash # MASH topology
+ push c # use iter ctr as SSL switch
+ call Constellation # create MASH constellation
+1: push a
+ LOG 5, "MASH started in process %d"
+ lea 4(sp), sp
+
+ popa
+C.nextSsslVar:
+ shr $1, d # shift to test next SSL bit
+ inc c # incr ctr
+ cmp $2, c
+ jl C.iterateOnSsl
+# wait for constellation processes completion
+ LOG 5, "waiting for constellations to terminate"
+ movl $0, bad(bp) # clear cummulative rc
+C.iterateOnWait:
+ lea status(bp), a
+ push a # -->return status of task
+ call wait
+ mov a, pid(bp) # save pid
+ cmp $-1, a # a task ended?
+ jne 0f # yes
+ call __errno_location
+ cmp $10, (a) # error == ECHILD ?
+ je C.ret # yes, no other subtasks
+ SYSERR "wait"
+0:
+# push status(bp) # status of task
+# push a # pid
+# LOG 5, "status returned from task %u: 0x%08x"
+ mov status(bp), d
+ test $0x7f, d
+ jnz 1f # task killed, ABEND
+ and $0xff00, d # task exited, extract rc
+ shr $8, d
+ or d, bad(bp) # accumulate rc
+ push d # task rc
+ push pid(bp) # task pid
+ LOG 5, "constellation task %u ended with exit(%d)"
+ jmp C.iterateOnWait # wait for other tasks
+1: push d
+ push pid(bp)
+ LOG 5, "constellation task %u killed, status=0x%x", 2
+2:
+# ABEND
+ LOG 0, "ABEND, kill all tasks"
+ pushl $15 # SIGTERM
+ pushl $0 # all tasks
+# call kill
+ SYS kill
+ pushl $1
+ call exit
+# normal end
+C.ret: movl C.csP, b # -->CS vector
+ movl C.shP(b), b # -->Share
+ pushl S.conns(b) # no. of connections made
+ pushl S.msgs(b) # no. of messages sent
+ LOG 1, "END, forwards=%d, connections=%d", 2
+ push bad(bp)
+ call exit
+#-----------------------------------------------
+# C O N S T E L L A T I O N O P E R A T I O N S
+#-----------------------------------------------
+ ARGS
+ DS ssl # ssl switch
+ DS topo # topology
+# returns: nothing
+ PROLOC
+ DL this, ConstellationL # this Constellation instance
+ DL thisP # -->this Constellation
+ DL deP # -->Debug
+ DL last # last node#
+ DL pid # pid returned from wait
+ DL stat # stat returned from wait
+ DL bad # "some node BAD" exit indicator
+ DL catched # count of returned node tasks
+ DL killed # count of killed node tasks
+ EPILOC
+#-----------------------------------------------
+ .global Constellation
+Constellation:
+ PROLOG
+ lea this(bp), b # -->this Constellation
+ mov b, thisP(bp) # save -->this Constellation
+# set debid
+ lea Cn.debug(b), a # -->Debug
+ mov a, deP(bp) # save -->Debug locally
+ mov C.csP, c # -->CS
+ cmp $Cn.ring, topo(bp) # ring topology ?
+ je 0f
+ mov C.mn(c), a # num. of nodes
+ mov a, Cn.nodes(b)
+ mov C.mp0(c), a # port # of fist node
+ mov a, Cn.first(b)
+ movl $Cn.mash, Cn.topo(b)
+ push $8f
+ jmp 1f
+0: mov C.rn(c), a # num. of nodes
+ mov a, Cn.nodes(b)
+ mov C.rp0(c), a # port # of fist node
+ mov a, Cn.first(b)
+ movl $Cn.ring, Cn.topo(b)
+ push $7f
+1: cmp $0, ssl(bp) # SSL ?
+ jz 2f # no
+ movl $1, Cn.ssl(b)
+ addl $500, Cn.first(b) # first SSL port #
+ push $6f
+ jmp 9f
+2: movl $0, Cn.ssl(b)
+ push $5f
+ jmp 9f
+5: .ascii "non\0"
+6: .ascii "\0"
+7: .ascii "RING\0"
+8: .ascii "MASH\0"
+9: DEBID "%sSSL %s", 2
+# check # of nodes
+ movL $0, bad(bp)
+ cmp $1, Cn.nodes(b) # num of nodes
+ jl Cn.ret # < 1 ? nothing to do
+ jg 0f
+ LOG 0, "1 node configuration not implemented yet"
+ jmp Cn.ret
+0: LOG 5, "initializing..."
+# determine divisor for random next node choise
+ mov $1, a
+ shl $31, a
+ not a # MAX_INT
+ xor d, d
+ divl Cn.nodes(b)
+ mov a, Cn.div(b) # save divisor (MAX_INT / nodes)
+# allocate "forward" indicator shared by nodes in constellation
+ push $0
+ push $-1
+ push $0x21 # PROT_READ | PROT_WRITE
+ push $0x03 # MAP_SHARED | MAP_ANONYMOUS
+ push $4
+ push $0
+# call mmap
+# cmp $-1, a
+# jne 0f
+# SYSERR "mmap"
+#0: mov thisP(bp), b
+ SYS mmap
+ mov a, Cn.forwP(b) # save -->forw
+ movl $1, (a) # enable forwarding
+ push Cn.nodes(b)
+ LOG 1, "%u nodes starting..."
+# start processes for all nodes in constellation
+ mov Cn.first(b), d # first node#
+ mov d, c
+ add Cn.nodes(b), c # last node + 1
+Cn.iterateOnFork:
+ pusha
+# call fork
+ SYS fork
+ cmp $0, a
+ jnz 1f # parent
+ popa
+ push d # node's port#
+ push b # -->Cnstlln
+ call Node
+1: mov a, pid(bp)
+ popa
+ push pid(bp) # nodes's pid
+ push d # node's port#
+ LOG 4, "node %u established in process %u", 2
+ inc d
+ cmp d, c # last node ?
+ jg Cn.iterateOnFork # no, continue forking
+# wait for completion of node processes
+ LOG 2, "all nodes established, waiting for them to terminate..."
+ movl $0, bad(bp) # accumulated return status of node tasks
+ movl $0, killed(bp) # num. of killed node tasks
+ movl $0, catched(bp) # num. of returned node tasks
+ lea -12(sp), sp # prepare space for loop
+Cn.iterateOnWait:
+ cmp $0, bad(bp) # constellation status still OK ?
+ je 1f # yes
+ mov Cn.forwP(b), a # -->forwarding switch
+ movl $0, (a) # disable forwarding between nodes
+1:
+ lea stat(bp), c
+ mov c, (sp) # -->return status of task
+ call wait
+ mov thisP(bp), b
+ cmp $-1, a # normal return from wait?
+ jne 0f # yes
+ call __errno_location
+ cmp $10, (a) # error == ECHILD ?
+ je Cn.allFinished # yes, no subtasks
+ SYSERR "wait"
+0:
+ incl catched(bp)
+ mov a, pid(bp) # save subtask's pid
+ mov a, (sp)
+ mov stat(bp), c # subtask return status
+ mov c, 4(sp)
+ test $0x7f, c # subtask ended by exit ?
+ jnz 2f # no, killed
+ and $0xff00, c # extract subtask rc
+ jz 1f # rc = 0
+ movl $1, bad(bp) # non zero rc, turn on BAD switch
+1: shr $8, c
+ mov c, 4(sp) # rc
+ mov a, (sp) # pid
+ LOG 4, "node process %u ended by exit(%d)"
+ jmp Cn.iterateOnWait # continue waiting for other subtasks
+2: movl $1, bad(bp) # subtask killed, turn on BAD switch
+ incl killed(bp) # counter of killed
+ mov c, 4(sp) # status
+ mov a, (sp) # pid
+ LOG 4, "node process %u killed, status=0x%x"
+ jmp Cn.iterateOnWait # continue waiting for other subtasks
+
+# opers of all nodes finished
+Cn.allFinished:
+ push killed(bp)
+ push catched(bp)
+ mov thisP(bp), b
+ push Cn.nodes(b)
+ cmp $0, bad(bp) # all nodes ended OK ?
+ je 0f # yes
+ push $7f
+ jmp 9f
+0: push $8f
+ jmp 9f
+7: .ascii "with ERROR\0"
+8: .ascii "OK\0"
+9: LOG 1, "ENDED %s, %u spawned, %u catched, %u killed"
+Cn.ret:
+ push bad(bp)
+ call exit
+#-----------------------------------------------
+# G E T I N T V A L U E S F R O M E N V
+#-----------------------------------------------
+ ARGS
+ DS key # -->env key string
+# returns int value or 0 if not found
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+C.getArg:
+ PROLOG
+ pushl key(bp) # -->env key string
+ call getenv # get value
+ test a, a
+ jz 0f # not found in ENV
+ push a
+ call atoi # convert to int
+0:
+ EPILOG_R
+#-----------------------------------------------
+# A B N O R M A L E N D
+#-----------------------------------------------
+ ARGS
+ DS deP # -->Debug
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+ .globl C.abend
+C.abend:
+ PROLOG
+ LOG 0, "ABEND"
+ push $15 # SIGTERM
+ push $0 # kill all
+# call kill
+ SYS kill
+ push $1
+ call exit
+#-----------------------------------------------
+ .end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Constellation.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,155 @@
+ .include "DS.S"
+#-----------------------------------------------
+# C O N S T E L L A T I O N O P E R A T I O N S
+#-----------------------------------------------
+ ARGS
+ DS ssl # ssl switch
+ DS topo # topology
+# returns: nothing
+ PROLOC
+ DL this, ConstellationL # this Constellation instance
+ DL thisP # -->this Constellation
+ DL deP # -->Debug
+ DL len
+ DL pid
+ DL stat
+ DL bad # "some node BAD" exit indicator
+ EPILOC
+#-----------------------------------------------
+ .global Constellation
+Constellation:
+ PROLOG
+ lea this(bp), b # -->this Constellation
+ mov b, thisP(bp) # save -->this Constellation
+# lea this, b # -->this inst
+# set debid
+ lea Cn.debug(b), a # -->Debug
+ mov a, deP(bp) # save -->Debug locally
+ mov C.csP, c # -->CS
+ cmp $Cn.ring, topo(bp) # ring topology ?
+ je 0f
+ mov C.mn(c), a # num. of nodes
+ mov a, Cn.nodes(b)
+ mov C.mp0(c), a # port # of fist node
+ mov a, Cn.first(b)
+ movl $Cn.mash, Cn.topo(b)
+ push $8f
+ jmp 1f
+0: mov C.rn(c), a # num. of nodes
+ mov a, Cn.nodes(b)
+ mov C.rp0(c), a # port # of fist node
+ mov a, Cn.first(b)
+ movl $Cn.ring, Cn.topo(b)
+ push $7f
+1: cmp $0, ssl(bp) # SSL ?
+ jz 2f # no
+ movl $1, Cn.ssl(b)
+ addl $500, Cn.first(b) # first SSL port #
+ push $6f
+ jmp 9f
+2: movl $0, Cn.ssl(b)
+ push $5f
+ jmp 9f
+5: .ascii "non\0"
+6: .ascii "\0"
+7: .ascii "RING\0"
+8: .ascii "MASH\0"
+9: DEBID "%sSSL %s", 2
+# check # of nodes
+ movL $0, bad(bp)
+ cmp $1, Cn.nodes(b) # num of nodes
+ jl ConstellationR # < 1 ? nothing to do
+ jg 0f
+ LOG 0, "1 node configuration not implemented yet"
+ jmp ConstellationR
+0: LOG 5, "initializing..."
+# determine divisor for random next node choise
+ mov $1, a
+ shl $31, a
+ not a # MAX_INT
+ xor d, d
+ divl Cn.nodes(b)
+ mov a, Cn.div(b) # save divisor (MAX_INT / nodes)
+# allocate "forward" indicator shared by nodes in constellation
+ push $0
+ push $-1
+ push $0x21 # PROT_READ | PROT_WRITE
+ push $0x03 # MAP_SHARED | MAP_ANONYMOUS
+ push $4
+ push $0
+ call mmap
+ cmp $-1, a
+ jne 0f
+ SYSERR "mmap"
+0: mov thisP(bp), b
+ mov a, Cn.forwP(b) # save -->forw
+ movl $1, (a) # enable forwarding
+ mov C.csP, c # -->CS
+ push C.ttl(c)
+ push Cn.nodes(b)
+ LOG 1, "%d node(s), ttl=%d starting..."
+# start processes for all nodes in constellation
+ mov Cn.first(b), d # first node#
+ mov d, c
+ add Cn.nodes(b), c # last node + 1
+0: pusha
+ call fork
+ cmp $0, a
+ jnz 1f # parent
+ popa
+ push d # node's port#
+ push b # -->Cnstlln
+ call Node
+1: mov a, pid(bp)
+ popa
+ push pid(bp) # nodes's pid
+ push d # node's port#
+ LOG 3, "node %u established in process %u", 2
+ inc d
+ cmp d, c # last node ?
+ jg 0b # no, continue forking
+# wait for completion of node processes
+ LOG 5, "all nodes established, waiting for them to terminate..."
+ movl $0, bad(bp)
+0: lea stat(bp), a
+ cmp $0, bad(bp) # status still OK ?
+ je 1f # yes
+ mov Cn.forwP(b), a # -->forward switch
+ movl $0, (a) # disable forwarding
+1: mov a, (sp) # -->return status of task
+ call wait
+ mov thisP(bp), b
+ cmp $0, a # normal return from wait?
+ jl 3f # no, all subtasks finshed
+ mov a, pid(bp) # save subtask's pid
+ mov stat(bp), a
+ test $0x7f, a # subtask ended by exit ?
+ jnz 2f # no, killed
+ and $0xff00, a # extract subtask rc
+ jz 1f # rc = 0
+ movl $1, bad(bp) # non zero rc, turn on BAD switch
+1: push a # rc
+ push pid(bp) # pid
+ LOG 4, "node process %u ended with exit(%d)"
+ jmp 0b # continue waiting for other subtasks
+2: movl $1, bad(bp) # subtask killed, turn on BAD switch
+ push pid(bp)
+ LOG 4, "node process %u killed"
+ jmp 0b # continue waiting for other subtasks
+
+# opers of all nodes finished
+3: cmp $0, bad(bp) # all nodes ended OK ?
+ je 0f # yes
+ push $7f
+ jmp 9f
+0: push $8f
+ jmp 9f
+7: .ascii "with ERROR\0"
+8: .ascii "OK\0"
+9: LOG 1, "ENDED %s"
+ConstellationR:
+ push bad(bp)
+ call exit
+#-----------------------------------------------
+ .end
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/DS.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,514 @@
+ .equ a, %eax
+ .equ b, %ebx
+ .equ c, %ecx
+ .equ d, %edx
+ .equ bp, %ebp
+ .equ sp, %esp
+ .equ di, %edi
+#-----------------------------------------------
+# M A C R O D E C L A R A T I O N S
+#-----------------------------------------------
+# subroutines list
+ .macro SUBRLIST
+ SUBR _start
+ SUBR D.init
+ SUBR D.setId
+ SUBR D.log
+ SUBR D.getTs
+ SUBR D.subr
+ SUBR Node
+ SUBR N.bind
+ SUBR N.conn
+ SUBR N.acc
+ SUBR N.closeSocket
+ SUBR N.read
+ SUBR N.write
+ SUBR N.get
+ SUBR N.put
+ SUBR N.sslErr
+ SUBR Data
+ SUBR Da.load
+ SUBR Da.chk
+ SUBR Da.unl
+ SUBR Da.dttl
+ SUBR Da.ttl
+ SUBR Da.getTs
+ .endm
+#-----------------------------------------------
+# push subroutine addr, prints its name and addr
+# leave subr name and addr on stack
+ .macro SUBR subr:req
+ push $\subr
+ push $9f
+ push $8f
+ push stderr
+ call fprintf
+ jmp 7f
+9: .asciz "\subr"
+8: .asciz "%-12s\t%p\n"
+7: lea 8(sp), sp
+ .endm
+#-----------------------------------------------
+# prepare to define subroutine args; use macro DS to define args
+ .macro ARGS
+ ac = 40
+ .endm
+#-----------------------------------------------
+# prepare to define subroutine args; use macro DS to define args
+ .macro M_ARGS
+ ac = 40
+ DS thisP
+ .endm
+#-----------------------------------------------
+# declare storage
+ .macro DS id:req, len=4
+ \id = ac
+ ac = ac + \len
+ .endm
+#-----------------------------------------------
+# prepare to define subroutine local vars
+ .macro PROLOC
+ ac = 0
+ .endm
+#-----------------------------------------------
+# prepare to define subroutine local vars
+ .macro M_LOCAL
+ ac = 0
+ DL deP
+ .endm
+#-----------------------------------------------
+# declare local var
+ .macro DL id:req, len=4
+ ac = ac + \len
+ \id = -ac
+ .endm
+#-----------------------------------------------
+# terminate subroutine local vars definition
+ .macro EPILOC
+ locL = ac
+ .endm
+#-----------------------------------------------
+# subroutine prolog
+# expects length of local storage under the name "locL"
+ .macro PROLOG
+ pusha
+ enter $locL, $0
+ .endm
+#-----------------------------------------------
+# subroutine prolog
+# expects length of local storage under the name "locL"
+ .altmacro
+ .macro M_PROLOG m_prefix:req, m_name:req
+ .global \m_prefix\().\m_name
+\m_prefix\().\m_name:
+ pusha
+ enter $ac, $0
+ mov thisP(bp), b
+ lea \m_prefix\().debug(b), a
+ mov a, deP(bp)
+ .endm
+ .noaltmacro
+#-----------------------------------------------
+# subroutine epilog without return value
+ .macro EPILOG
+ leave
+ popa
+ ret
+ .endm
+#-----------------------------------------------
+# subroutine epilog with return value in EAX
+# expects length of local storage under the name "locL"
+ .macro EPILOG_R
+ leave
+ mov a, 28(sp)
+ popa
+ ret
+# mov -36(sp), a
+ .endm
+#-----------------------------------------------
+# ABEND at system routine err
+ .macro ERR causer:req
+ pushl $9f
+ call printf
+ pushl $1
+ call exit
+9: .asciz "\causer: %m\n"
+ .endm
+#-----------------------------------------------
+# ABEND at system routine err using debug structure
+# expects deP(bp)-->Debug inst
+ .macro SYSERR causer:req
+ push $9f # -->format
+ push $0 # no. of values
+ push $0 # msg debug level
+ pushl deP(bp) # -->Debug instance
+ call D.log # print msg
+ call C.abend
+9: .asciz "\causer: %m"
+ .endm
+#-----------------------------------------------
+# print SSL err msg queue and then abend
+# -->Debug in deP(bp) is expected
+ .macro SSLERR msg:req
+ pushl $9f
+ pushl deP(bp)
+ call N.sslErr
+9: .asciz "\msg"
+ .endm
+#-----------------------------------------------------------
+ .macro SYS name:req
+ call \name
+ cmp $-1, a
+ jne 8f
+ SYSERR "\name"
+8: mov thisP(bp), b
+ test a, a
+ .endm
+#-----------------------------------------------
+# get integer value from ENV
+ .macro GETINTENV key:req
+ pushl $9f
+ call C.getArg
+ lea 4(sp), sp
+ cmp $0, a
+ jmp 8f
+9: .asciz "\key"
+8:
+ .endm
+#-----------------------------------------------
+# set local debug ID
+# supposed: local deP-->Debug
+# expected on the stack:
+# \argc values to be put in debug ID according to format
+# \argc may be zero
+ .macro DEBID format:req, argc=0
+ push $9f # -->format
+ push $\argc # no. of values
+ pushl deP(bp) # -->Debug instance
+ call D.setId # put debug ID to Debug inst
+ jmp 8f
+9: .asciz "\format"
+8: lea 12(sp), sp
+ .endm
+#-----------------------------------------------
+# debug output
+# supposed: local deP-->Debug
+# expected on the stack:
+# \argc values to be printed by \format at debug level \level
+# \argc may be zero
+# all regs are preserved
+ .macro LOG level:req, format:req, argc=7
+ push $9f # -->format
+ push $\argc # no. of values
+ push $\level # msg debug level
+ pushl deP(bp) # -->Debug instance
+ call D.log # print msg
+ jmp 8f
+9: .asciz "\format"
+8: lea 16(sp), sp
+ .endm
+#-----------------------------------------------
+# flat print
+# sp, bp are preserved
+ .macro PR format:req
+ push $9f
+ push stderr
+ call fprintf
+ jmp 8f
+9: .asciz ">>> \format\n"
+8: lea 8(sp), sp
+ .endm
+#-----------------------------------------------
+# print ip, bp, sp before subroutine call
+# to be placed directly before a call instruction
+# expected deP(bp) as -->Debug
+# all regs are preserved
+ .macro B_CALL level:req, label="BEFORE SUBR CALL "
+ push a
+ lea 4(sp), a
+ push a
+ push bp
+ pushl $7f+5
+ LOG \level, "\label: ip: %p, bp: %p, sp: %p", 3
+ lea 12(sp), sp
+ pop a
+ jmp 7f
+7:
+ .endm
+#-----------------------------------------------
+# print ip, bp, sp saved in sburoutine's stack frame
+# to be placed anywhere before return seq and after deP(bp) is set
+# expected deP(bp) as -->Debug
+# all regs are preserved
+ .macro B_RET level:req, label="BEFORE SUBR RETURN"
+ push a
+ lea 8(bp), a
+ push a
+ push (bp)
+ push 4(bp)
+ LOG \level, "\label: ip: %p, bp: %p, sp: %p", 3
+ lea 12(sp), sp
+ pop a
+ jmp 7f
+7:
+ .endm
+#-----------------------------------------------
+# print ip, bp, sp retained for return from sburoutine
+# all regs are preserved
+ .macro A_CALL label="AFTER CALL"
+ pusha
+ lea 8(bp), a
+ push a
+ push (bp)
+ push 4(bp)
+ pushl $9f
+ call printf
+ lea 16(sp), sp
+ popa
+ jmp 8f
+9: .asciz "\label: ip: %p, bp: %p, sp: %p\n"
+8:
+ .endm
+#-----------------------------------------------
+ .macro SLEEPER time:req, ident=">>>"
+ pusha
+ LOG 7, "\ident: sleeping \time"
+ pushl $\time
+ call sleep
+ lea 4(sp), sp
+ LOG 7, "\ident: woken up"
+ popa
+ .endm
+#-----------------------------------------------
+# A B S T R A C T D A T A D E C L A R A T I O N S
+#-----------------------------------------------
+# rSA reg save area
+ ac = 0
+ DS retSA
+ DS regSA, 32
+ DS bpSA
+ rSAL = ac
+#-----------------------------------------------
+# Timeval
+ ac = 0
+ DS secs
+ DS usecs
+ timevalL = ac
+#-----------------------------------------------
+# AddrInfo IP family net addr block
+ ac = 0
+ DS ai_flags
+ DS ai_family
+ DS ai_socktype
+ DS ai_protocol
+ DS ai_addrlen
+ DS ai_addrP # -->sockaddr
+ DS ai_canonnameP
+ DS ai_nextP
+ AddrInfoL = ac
+#-----------------------------------------------
+ SIGUSR2 = 12
+ SIG_IGN = 1
+ SIG_UNBLOCK = 1
+# sigaction signal handler definition
+ ac = 0
+ DS sa_handler
+ DS sa_mask, 128
+ DS sa_flags
+ DS sa_unused
+ SigActionL = ac
+#-----------------------------------------------
+# SockAddr
+ ac = 0
+ DS sa_family, 2
+ DS sa_data, 14
+ SockAddrL = ac
+#-----------------------------------------------
+# timeval struct
+ ac = 0
+ DS Ti.secs
+ DS Ti.usecs
+ timevalL = ac
+#-----------------------------------------------
+# Debug debug info
+ ac = 0
+ D.idL = 128
+ DS D.id, D.idL # debug ID of process
+ D.msgL = 256
+ DS D.msg, D.msgL # debug msg workspace
+ DebugL = ac
+
+ .macro DebugA # returns -->Debug
+ push $DebugL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8: movl $0, D.id(a)
+ .endm
+#-----------------------------------------------
+# CS top level attributes
+ ac = 0
+ DS C.debug, DebugL
+ DS C.debMaxLev # max level of debug msgs to be printed
+ DS C.txtP # -->text to be sent in messages
+ DS C.ttl # TTL for circulating msgs
+ DS C.mp0 # TCP port of first mash node
+ DS C.mn # intended # of nodes in mash
+ DS C.rp0 # TCP port of first ring node
+ DS C.rn # intended # of nodes in ring
+ DS C.pace.tv_sec # timespec.tv_sec
+ DS C.pace.tv_nsec # timespec.tv_nsec
+ DS C.pacing # pacing indicator
+# DS C.rs # random() seed
+ DS C.ssl # ssl mask: 01B=noSSL, 10B=SSL, 11B=both
+ DS C.connTh # connection retry threshhold
+ DS C.shP # -->shared counters
+ DS C.pathP # -->pathname to application home dir
+ DS C.cePathP # -->pathname to SSL dir
+ DS C.caPathP # -->pathname to SSL CA CERT dir
+ CSL = ac
+
+ .macro CSA # returns -->CS
+ push $CSL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8:
+ .endm
+#-----------------------------------------------
+# Share counters shared between procs or threads
+ ac = 0
+ DS S.counter_sem, 16 # semaphore for counters
+ DS S.conns # overall connections counter both in ring and mash
+ DS S.msgs # overall forewards# both in ring and mash
+ DS S.act # active node processes counter
+ DS S.mash_open_client_count # no. of opened clients in mash
+ DS S.mash_open_SSL_client_count # no. of opened SSL clients in mash
+ ShareL = ac
+
+ .macro ShareA # returns -->Share
+ push $0
+ push $-1
+ push $0x21 # PROT_READ | PROT_WRITE
+ push $0x03 # MAP_SHARED | MAP_ANONYMOUS
+ push $ShareL
+ push $0
+ call mmap
+ cmp $-1, a
+ jne 8f
+ SYSERR "mmap"
+8:
+ .endm
+#-----------------------------------------------
+# Data - container for data sent through connection topology
+# Container Header
+ ac = 0
+ DS H.ttl # msg TTL
+ DS H.ts # timestamp
+ DS H.lport # listening TCP port
+ HeaderL = ac
+# Container Payload
+ ac = 0
+# Pa.loadL = 256
+ DS Pa.ts # timestamp
+ DS Pa.text, 0 # load sent in msg
+ PayloadL = ac
+# Container to be send
+ ac = 0
+ DS Co.hdr, HeaderL # Header
+ DS Co.payl, PayloadL # Payload
+# ContainerL = ac
+# Data instantion
+ ac = 0
+ DS Da.debug, DebugL # Debug
+ DS Da.contP # -->Container
+ DS Da.datalen # container length
+ DataL = ac
+
+ .macro DataA # returns -->Data
+ push $DataL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8:
+ .endm
+#-----------------------------------------------
+# Node general attributes of node
+ ac = 0
+ DS N.debug, DebugL # debug info
+ DS N.cnstlnP # -->Constellation block
+ DS N.topo # constellation topology
+ DS N.locPort # TCP port node binds to
+ DS N.first # port # of first node in constellation
+ DS N.last # port # of last node in constellation
+ DS N.nodes # number of nodes
+ DS N.div # random node choise divisor (MAX_INT / nodes)
+ DS N.kicker # kicker indicator
+ DS N.forwP # -->shared forward indicator
+ DS N.closing # closing in progress indicator
+ DS N.ssc # server side socket
+ DS N.data, DataL # data block
+ DS N.dataP # -->data block
+ DS N.srvSideP # -->array of server side Sockets
+ DS N.cliSideP # -->array of client side Sockets
+ DS N.sockArrLen # socket array length
+ DS N.next # next node's port#
+ DS N.nfds # highest FD# in FD sets
+ DS N.rs, 128 # read FD set
+ DS N.es, 128 # exceptional FD set
+ DS N.t, timevalL # timeval for select
+ DS N.ssl # SSL switch: 0=noSSL, 1=SSL
+ DS N.ctxP # -->SSL context
+ DS N.pid # process ID
+ DS N.ptid # closing thread ID
+ NodeL = ac
+
+ .macro NodeA # returns -->Node
+ push $NodeL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8:
+ .endm
+#-----------------------------------------------
+# SocketInfo comm socket info
+ ac = 0
+ DS So.remPort # TCP port on remote site
+ DS So.sc # comm socket#
+ DS So.sslP # -->SSL structure
+ SocketInfoL = ac
+
+ .macro SocketA # returns -->Socket
+ push $SocketL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8:
+ .endm
+#-----------------------------------------------
+# Constellation attributes of constellation of nodes (mash or ring)
+ ac = 0
+ DS Cn.debug, DebugL # debug info
+ DS Cn.topo # Constellation topology
+ Cn.ring = 0
+ Cn.mash = 1
+ DS Cn.first
+ DS Cn.nodes
+ DS Cn.div # random node choise divisor (MAX_INT / nodes)
+ DS Cn.ssl
+ DS Cn.forwP
+ ConstellationL = ac
+
+ .macro ConstellationA # returns -->Constellation
+ push $ConstellationL
+ call malloc
+ cmp $0, a
+ ja 8f
+ SYSERR "malloc"
+8:
+ .endm
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Data.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,270 @@
+ .include "DS.S"
+ .text
+#-----------------------------------------------
+# C O N S T R U C T O R
+#-----------------------------------------------
+ ARGS
+ DS thisP # -->Data inst
+ DS callerDeP # -->originator's Debug
+ PROLOC
+ DL deP # -->Debug
+ EPILOC
+#-----------------------------------------------
+ .global Data
+Data:
+ PROLOG
+ mov thisP(bp), b # -->Data inst
+ lea Da.debug(b), a # -->Debug
+ mov a, deP(bp) # init local -->Debug
+# construct debug ID from originator debID
+ mov callerDeP(bp), a # -->caller's Debug
+ lea D.id(a), a
+ push a # -->caller's DebugId
+ DEBID "%s DATA", 1
+# calculate container size
+ mov C.csP,a
+ push C.txtP(a)
+ call strlen
+ addl $HeaderL, a
+ addl $PayloadL, a
+ inc a
+ mov thisP(bp), b # -->Data inst
+ mov a, Da.datalen(b) # save container length
+# allocate container
+ push a
+ call malloc
+ test a, a
+ jnz 0f
+ SYSERR "malloc"
+0:
+ mov thisP(bp), b # -->Data inst
+ mov a, Da.contP(b) # save -->container
+# initialize container
+ mov Da.contP(b), c # -->Container
+ lea Co.hdr(c), a # -->Header
+ movl $0, H.ttl(a)
+ movl $0, H.ts(a)
+ push $0f
+ lea Co.payl(c), a # -->Payload
+ lea Pa.text(a), a
+ push a
+ call strcpy # init Payload.text
+ LOG 5, "Data instance established"
+ EPILOG
+0: .asciz "EMPTY"
+#-----------------------------------------------
+# L O A D P A Y L O A D
+#-----------------------------------------------
+# returns -->data container
+ M_ARGS
+ DS ttl
+ DS loadP # -->text to be sent
+ M_LOCAL
+ DL t, timevalL # timestamp buf
+ M_PROLOG Da, load
+ LOG 5, "loading..."
+ mov Da.contP(b), c # -->Container
+# fill header
+ lea Co.hdr(c), d # -->containner header
+ mov ttl(bp), a
+ mov a, H.ttl(d) # init TTL in header
+# get timestamp
+ call D.getTs # get timestamp
+ mov a, H.ts(d) # timestamp to header
+# init payload
+ push loadP(bp) # -->loaded text
+ lea Co.payl(c), a # -->Payload
+ lea Pa.text(a), a
+ push a
+ call strcpy # copy text to payload
+# return -->Data
+ push Da.datalen(b)
+ LOG 5, "payload loaded to container, len=%u"
+ mov Da.contP(b), a # return -->container
+ EPILOG_R
+#-----------------------------------------------
+# G E T D A T A C O N T A I N E R L E N G T H
+#-----------------------------------------------
+# returns length of container filled with data
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, getDataLen
+ mov Da.datalen(b), a
+ EPILOG_R
+#-----------------------------------------------
+# G E T D A T A C O N T A I N E R P O I N T E R
+#-----------------------------------------------
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, getContP
+ mov Da.contP(b), a
+ EPILOG_R
+#-----------------------------------------------
+# C H E C K P A Y L O A D
+#-----------------------------------------------
+# returns boolean: original text == received text
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, chk
+# get original payload
+ mov C.csP, c # -->CS
+ pushl C.txtP(c) # -->orig text
+# get payload from container
+ mov Da.contP(b), a # -->Container
+ lea Co.payl(a), a # -->Payload
+ lea Pa.text(a), a # -->payload text
+ push a
+ call strcmp
+ xor c, c
+ test a, a # 0 = texts are equal
+ setz %cl
+ mov c, a
+ EPILOG_R
+#-----------------------------------------------
+# U N L O A D P A Y L O A D
+#-----------------------------------------------
+# returns -->payload text
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, unl
+ mov thisP(bp), b # -->Data inst
+ lea Da.debug(b), a
+ mov a, deP(bp)
+# get payload from container
+ mov Da.contP(b), a # -->Container
+ lea Co.payl(a), a # -->Payload
+ lea Pa.text(a), a # -->Payload.text
+ EPILOG_R
+#-----------------------------------------------
+# D E C R E M E N T T T L
+#-----------------------------------------------
+# decrement TTL in data container header, save it and return it
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, dttl
+ mov Da.contP(b), a # -->Container
+ lea Co.hdr(a), a # -->container header
+ decl H.ttl(a) # TTL--
+ mov H.ttl(a), a # return TTL
+ EPILOG_R
+#-----------------------------------------------
+# G E T T T L
+#-----------------------------------------------
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, ttl
+ mov Da.contP(b), a # -->Container
+ lea Co.hdr(a), a # -->container header
+ mov H.ttl(a), a # return TTL
+ EPILOG_R
+#-----------------------------------------------
+# P U T l I S T E N P O R T T O H E A D E R
+#-----------------------------------------------
+ ARGS
+ DS thisP # -->Data inst
+ DS port
+# returns port value
+ PROLOC
+ DL deP # -->Debug
+ EPILOC
+#-----------------------------------------------
+ .global Da.putPort
+Da.putPort:
+ PROLOG
+ mov thisP(bp), b # -->Data inst
+ lea Da.debug(b), a
+ mov a, deP(bp) # -->Debug
+
+ mov Da.contP(b), c # -->Container
+ lea Co.hdr(a), c # -->container header
+ mov port(bp), a
+ mov a, H.lport(c)
+ EPILOG_R
+#-----------------------------------------------
+# G E T l I S T E N P O R T F R O M H E A D E R
+#-----------------------------------------------
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, getPort
+ mov Da.contP(b), c # -->Container
+ lea Co.hdr(c), c # -->container header
+ mov H.lport(c), a
+ EPILOG_R
+#-----------------------------------------------
+# G E T T I M E S T A M P F R O M H E A D E R
+#-----------------------------------------------
+ M_ARGS
+ M_LOCAL
+ M_PROLOG Da, getTs
+ mov Da.contP(b), a # -->Container
+ lea Co.hdr(a), a # -->container header
+ mov H.ts(a), a # return timestamp
+ EPILOG_R
+#-----------------------------------------------
+# S A B O T A G E T E X T
+#-----------------------------------------------
+ ARGS
+ DS thisP # -->Data inst
+ PROLOC
+ DL deP # -->Debug
+ EPILOC
+#-----------------------------------------------
+ .global Da.sabotage
+Da.sabotage:
+ PROLOG
+ lea Da.contP(b), a # -->Container
+ lea Co.payl(a), a # -->Payload
+ lea Pa.text(a), a # -->Payload.text
+ movl $'?', (a)
+ EPILOG
+#-----------------------------------------------
+# C R E A T E D I G E S T F R O M T E X T
+#-----------------------------------------------
+# returns -->24 chars payload text digest
+ M_ARGS
+ DS digestP # -->text digest buffer
+ M_LOCAL
+ DL textP
+ M_PROLOG Da, digest24
+# check length of payload text
+ mov Da.contP(b), a # -->Container
+ lea Co.payl(a), a # -->Payload
+ lea Pa.text(a), a # -->Payload.text
+ mov a, textP(bp)
+ push a
+ call strlen
+ cmp $24, a # payload text length < 24 ?
+ jl Da.digest24Direct # yes, direct copy
+# create digest from longer text
+ push $8
+ push textP(bp)
+ push digestP(bp)
+ call strncpy # copy beg. of text
+ push $0f
+ mov digestP(bp), a # -->digest
+ add $8, a
+ push a # -->digest+8
+ call strcpy
+ jmp 1f
+0: .asciz "-------"
+1: push textP(bp)
+ call strlen
+ mov textP(bp), c
+ add a, c
+ lea -8(c), c # -->end of text - 8
+ push c
+ mov digestP(bp), a
+ lea 15(a), a # -->digest+15
+ push a
+ call strcpy
+ jmp Da.digest24Ex
+# directly copy shorter text
+Da.digest24Direct:
+ push textP(bp)
+ push digestP(bp)
+ call strcpy
+Da.digest24Ex:
+ mov digestP(bp), a # -->digest
+ EPILOG_R
+#-----------------------------------------------
+ .end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Debug.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,273 @@
+ .include "DS.S"
+#-----------------------------------------------
+# A B S T R A C T S T R U C T U R E S
+#-----------------------------------------------
+#-----------------------------------------------
+# S T A T I C V A R I A B L E S
+#-----------------------------------------------
+ .data
+D.t0: .int 0 # initial timestamp in usecs
+D.semP: .int 0 # -->semaphore for debug output
+ .globl D.prgNameP
+D.prgNameP:
+ .int 0 # -->prog name
+#-----------------------------------------------
+# D E B U G I N I T
+#-----------------------------------------------
+# initialize static debug vars
+ ARGS
+ DS prgP # -->program name
+# returns: nothing
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+ .text
+ .global D.init
+D.init:
+ PROLOG
+ push prgP(bp)
+ call strlen
+ inc a
+ push a
+ call malloc
+ mov a, D.prgNameP # save -->prog name for debugging
+ push prgP(bp)
+ push a
+ call strcpy
+ movl $0, D.t0
+ call D.getTs # get actual time - t0 = actual time
+ mov a, D.t0 # set t0 to actual time
+
+ jmp 3f
+ push $0
+ push $t
+ call gettimeofday # get initial timestamp t0
+ cmp $0, a
+ jz 0f
+ ERR "gettimeofday"
+0: lea t, b # compute (1000000*secs+usecs)
+ mov secs(b), c
+ mov $1000000, a
+ mul c # 1000000*secs
+ add usecs(b), a # 1000000*secs+usecs
+ movl a, D.t0 # save low 32 bits of timestamp in t0
+3:
+ push $0 # allocate semaphore for debug output
+ push $-1
+ push $0x21 # PROT_READ | PROT_WRITE
+ push $0x03 # MAP_SHARED | MAP_ANONYMOUS
+ push $16 # semaphore size
+ push $0
+ call mmap # get shared storage for semaphore
+ cmp $-1, a
+ jne 0f
+ ERR "mmap"
+0: mov a, D.semP # save -->semaphore
+
+ push $1 # initial semaphore value
+ push $1 # semaphore shared between processes
+ pushl D.semP
+ call sem_init # initialize semaphore
+ cmp $0, a
+ jz 0f
+ ERR "sem_init of debug semaphore"
+0:
+ EPILOG
+#-----------------------------------------------
+# S E T D E B U G I D
+#-----------------------------------------------
+# sets debug ID according to format
+ ARGS
+ DS deP # -->Debug instance
+ DS argc # # of values to be put into deb ID
+ DS formP # -->format
+# ... # values
+# returns: nothing
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+ .global D.setId
+D.setId:
+ PROLOG
+# format debug ID
+ mov argc(bp), c # no. of values
+0: mov formP(bp,c,4), a # push values and format
+ push a
+ dec c
+ jns 0b # iterate on values
+ pushl $D.idL # max deb ID len
+ mov deP(bp), b # -->Debug instance
+ lea D.id(b), a # -->DebId
+ push a
+ call snprintf # construct DebId
+ cmp $0, a
+ jnl 0f
+ ERR "DEBID snprintf"
+0:
+ LOG 6, "debid '%s' set", 1
+ EPILOG
+#-----------------------------------------------
+# P R I N T D E B U G M S G
+#-----------------------------------------------
+# outputs to stderr the synchronized debug msg with time diff from program start
+# on appripriate debug level, flushes stderr
+ ARGS
+ ac = 40
+ DS deP # -->Debug instance
+ DS level # msg debug level
+ DS argc # # of values to be used in msg
+ DS form # -->format
+# ... # values
+# returns: nothing
+ PROLOC
+ ac = 0
+ EPILOC
+#-----------------------------------------------
+ .global D.log
+D.log:
+ PROLOG
+
+ mov C.csP, c
+ mov level(bp), a # msg debug level
+ cmp C.debMaxLev(c), a # max debug level >= msg debug level ?
+ jg 3f # nothing to print, leave
+ cmp $7, C.debMaxLev(c) # on max debug level = 7 print level 7 only
+ jne 0f
+ cmp $7, a # msg debug level = 7 ?
+ jne 3f
+# prepare output string of msg
+0: mov argc(bp), c # no. of values
+0: mov form(bp,c,4), a # copy values and format
+ push a
+ dec c
+ jns 0b
+ push $D.msgL # msg buffer len
+ mov deP(bp), b # -->Debug instance
+ lea D.msg(b), a # -->msg buffer
+ push a
+ call snprintf # construct msg in buffer
+ cmp $0, a
+ jnl 0f
+ ERR "LOG snprintf"
+# synchronize with other tasks
+0: pushl D.semP
+ call sem_wait # synchronize
+ cmp $0, a
+ jz 0f
+ ERR "LOG sem_wait"
+# get timestamp
+0: call D.getTs # time from beg of program in usecs
+# print debug msg
+ mov deP(bp), b # -->Debug instance
+ lea D.msg(b), c # -->msg text
+ push c
+ lea D.id(b), c # -->instance debug ID
+ push c
+ push a # tdiff in usecs
+ push $1f
+ push stderr
+ call fprintf
+ jmp 2f
+1: .ascii "%09u %s: %s\n\0"
+# flush stderr
+2: push stderr
+ call fflush # flush stderr
+# free semaphore
+ pushl D.semP
+ call sem_post # free semaphore
+ cmp $0, a
+ jz 3f
+ ERR "LOG sem_post"
+3:
+ EPILOG
+#-----------------------------------------------
+# G E T T I M E S T A M P I N U S E C S
+#-----------------------------------------------
+# returns timestamp in usecs from beg of program
+# returns: int value
+ PROLOC
+ DL t, timevalL # timestamp buffer
+ EPILOC
+#-----------------------------------------------
+ .global D.getTs
+D.getTs:
+ PROLOG
+ push $0
+ lea t(bp), b # -->timeval buffer
+ push b
+ call gettimeofday
+# compute time delta from program start = t-t0 in usecs
+# tdiff = (1000000*ts.secs+ts.usecs)-t0
+ lea t(bp), b # actual timestamp
+ mov secs(b), c # t.secs
+ mov $1000000, a
+ mul c # 1000000*t.secs
+ add usecs(b), a # 1000000*t.secs+t.usecs
+ sub D.t0, a # 1000000*t.secs+t.usecs-t0
+ EPILOG_R
+#-----------------------------------------------
+# S U B R O U T I N E S L I S T
+#-----------------------------------------------
+# print subroutines names and addresses
+# localize address from env ADDR= in subroutine
+ PROLOC
+ DL wAddr # wanted addr
+ EPILOC
+#-----------------------------------------------
+ .global D.subr
+D.subr:
+ PROLOG
+ movl $0, wAddr(bp)
+ pushl $0f
+ call getenv # get env ADDR
+ lea 4(sp), sp
+ jmp 1f
+0: .ascii "ADDR\0"
+1: cmp $0, a
+ jz 0f # no addr wanted
+ pushl $16
+ pushl $0
+ push a
+ call strtol # convert str to hex
+ lea 12(sp), sp
+ mov a, wAddr(bp) # searched addr
+0:
+ pushl $0 # end of list
+ SUBRLIST # print and push list of subroutines
+ cmp $0, wAddr(bp) # address wanted ?
+ je D.subrR
+ mov wAddr(bp), d
+ mov sp, b # -->beg of subr list
+ xor a, a
+ jmp 1f
+0: lea 8(b), b
+1: cmpl $0, (b) # end of list ?
+ je 2f # yes
+ cmp 4(b), d # -->subr > wanted addr ?
+ jb 0b # yes, next
+ cmp 4(b), a # -->subr > last found ?
+ jnb 0b # no, next
+ mov (b), c # get subr name
+ mov 4(b), a # get subr addr
+ jmp 0b # next
+2: test a, a
+ jnz 1f
+ push d
+ push $0f
+ push stderr
+ call fprintf
+ jmp D.subrR
+0: .ascii "addr %p not found in subroutines\n\0"
+1: sub a, d
+ push d # offset in subr
+ push c # subroutine name
+ pushl wAddr(bp) # wanted addr
+ push $0f
+ push stderr
+ call fprintf
+ jmp D.subrR
+0: .ascii "searched addr %p in subroutine %s at offset %x\n\0"
+D.subrR:
+ EPILOG
+#-----------------------------------------------
+ .end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Makefile Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,17 @@
+O := CS.o Debug.o Node.o Data.o
+H := DS.S
+
+ASFLAGS := -Wall --gstabs -am --32
+LDFLAGS := -m elf_i386 -lc -lm -lpthread -lrt -lssl -lcrypto
+
+.PHONY: all clean
+all: CS
+
+CS: $(O) Makefile
+ ld $(LDFLAGS) $(O) -o $@
+
+%.o: %.S $(H) Makefile
+ as $(ASFLAGS) -a=$*.lst -o $@ $<
+
+clean:
+ rm -f CS $(O)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Node.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1616 @@
+ .include "DS.S"
+ .data
+ .text
+#-----------------------------------------------
+# N O D E O P E R A T I O N S
+#-----------------------------------------------
+ ARGS
+ DS CnstlnP # -->Constellation inst
+ DS port # node's port#
+# returns
+# nothing
+# local vars
+ PROLOC
+ DL this, NodeL # this Node instance
+ DL thisP # -->this Node
+ DL deP # -->Debug inst for LOG macro
+ DL rc
+ DL tosignal # switch
+ DL sigact, SigActionL
+ DL sigmask, 128
+ DL sigret
+ EPILOC
+#-----------------------------------------------
+ .global Node
+Node:
+ PROLOG
+ lea this(bp), b # -->this Node
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save -->Debug locally for LOG macro
+# save parm values
+ mov CnstlnP(bp), c # -->Constellation
+ mov c, N.cnstlnP(b) # save -->Constellation
+ mov port(bp), a
+ mov a, N.locPort(b) # save port#
+# set Debug id
+ push a
+ cmpl $Cn.ring, Cn.topo(c)
+ je 0f
+ push $8f
+ jmp 1f
+0: push $7f
+1: cmp $0,Cn.ssl(c)
+ je 2f
+ push $6f
+ jmp 9f
+2: push $5f
+ jmp 9f
+5: .asciz "non"
+6: .asciz ""
+7: .asciz "RING"
+8: .asciz "MASH"
+9: DEBID "%sSSL %s node %d", 3 # N->ssl ? "" : "non", N.topo==mash ? "MASH" : "RING", locPort
+ call N.init # initialize node
+ call N.bind # bind node to local port
+ push N.kicker(b) # kicker indicator
+ LOG 5, "kicker=%d"
+ cmpl $0, N.kicker(b) # kicker ?
+ je 0f # no
+ call N.kickOff # start transfer
+0:
+ call N.mainLoop # iterate on socket I/O select
+ LOG 5, "closing ssc"
+ push N.ssc(b)
+ SYS close
+ movl $0, rc(bp) # rc = OK
+# wait on thread closing client side
+ cmpl $0, N.closing(b) # closing client side ?
+ je N.chkData # no
+ push $0
+ push N.ptid(b) # thread id
+ call pthread_join
+ test a, a
+ jz 1f
+ push a # unique error handlig for pthread_join
+ call strerror
+ push a
+ LOG 0, "pthread_join: %s"
+ push deP(bp)
+ call C.abend
+1:
+ mov thisP(bp), b # -->this Node
+ push N.pid(b)
+ LOG 5, "task %u: closing thread returned"
+# synchronize with all nodes in all constellations and then check received data
+N.chkData:
+ call N.sync # wait for others
+ cmpl $0, N.kicker(b) # kicker ?
+ je 0f # no
+
+ push N.dataP(b) # -->Data
+ call Da.chk
+ test a, a # data check OK ?
+ jnz 0f # yes
+ movl $1, rc(bp) # rc = BAD
+ LOG 0, "data on INPUT TO and OUTPUT FROM constellation DIFFER"
+0:
+ push N.pid(b)
+ push rc(bp)
+ LOG 2, "end of operations, rc=%d, process=%u"
+ call exit
+#-----------------------------------------------
+# C O N S T R U C T O R
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+# DL CnstlnP # -->Constellation
+ DL len # various lengths
+ strL = 128
+ DL str, strL # string buf
+ EPILOC
+#-----------------------------------------------
+N.init: # c: -->Cnstln
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "initializing..."
+# initialize node's attributes
+ call getpid
+ mov a, N.pid(b)
+ mov N.cnstlnP(b), c # -->Constellation
+ push deP(bp) # -->caller Debug id
+ lea N.data(b), a
+ mov a, N.dataP(b)
+ push a # -->Data
+ call Data # initialize data item
+ mov Cn.topo(c), a
+ mov a, N.topo(b) # save topology
+ mov Cn.ssl(c), a
+ mov a, N.ssl(b) # SSL switch
+ mov Cn.first(c), a
+ mov a, N.first(b) # save port# of first node in constellation
+ movl $0, N.kicker(b) # clear kicker switch
+ cmp a, N.locPort(b) # kicker ? (locPort == first)
+ sete N.kicker(b)
+1: add Cn.nodes(c), a
+ dec a
+ mov a, N.last(b) # save port# of last node (last = first + nodes -1)
+ mov Cn.nodes(c), a
+ mov a, N.nodes(b) # save num. of nodes in constellation
+ mov Cn.div(c), a
+ mov a, N.div(b) # save divisor for random choice (MAX_INT / nodes)
+ mov Cn.topo(c), a
+ mov a, N.topo(b)
+ mov Cn.forwP(c), a
+ mov a, N.forwP(b) # save -->shared "forward" indicator
+ movl $0, N.closing(b) # unset closing indicator
+# allocate and initialize server side and client side sockets
+ mov $SocketInfoL, a
+ mull N.nodes(b) # SocketInfo array len
+ mov a, N.sockArrLen(b) # save array len
+ push a
+ call malloc
+ cmp $0, a
+ jnl 0f
+ SYSERR "malloc"
+0: mov thisP(bp), b # -->this Node
+ mov a, N.srvSideP(b) # save -->array of server side sockets
+ pushl N.sockArrLen(b)
+ call malloc
+ cmp $0, a
+ jnl 0f
+ SYSERR "malloc"
+0: mov thisP(bp), b # -->this Node
+ mov a, N.cliSideP(b) # save -->array of client side sockets
+# initialize allocated socket infos
+ xor c, c # offset
+0: mov N.srvSideP(b), a # -->server side SocketInfo array
+# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
+ movl $-1, So.sc(a, c) # indicate server side socket is not in use
+ mov N.cliSideP(b), a # -->client side SocketInfo array
+# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
+ movl $-1, So.sc(a, c) # indicate client side socket is not in use
+ lea SocketInfoL(c), c # incr by SocketInfo len
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate
+
+ cmpl $0, N.ssl(b) # node's ssl switch
+ jz N.initR # no SSL
+# prepare SSL context
+# call SSL_load_error_strings
+# call SSL_library_init
+ call OPENSSL_init_ssl
+ LOG 4, "setting SSL contex...", 0
+ call TLS_method
+ push a
+ call SSL_CTX_new
+ mov thisP(bp), b # -->this Node
+ mov a, N.ctxP(b) # save -->new SSL context
+ test a, a
+ jnz 0f
+ SSLERR "new SSL CTX"
+# set SSL mode
+0: pushl $0
+ pushl $4 # SSL_MODE_AUTO_RETRY
+ pushl $33 # SSL_CTRL_MODE
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_ctrl
+ movl $0, 8(sp) # no callback
+ movl $2, 4(sp) # mode = SSL_VERIFY_FAIL_IF_NO_PEER_CERT
+ call SSL_CTX_set_verify # set peer certificate verification parameters
+# set X509 key file name
+ mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local TCP port#
+ mov C.csP, c
+ pushl C.cePathP(c) # -->name of SSL path
+ pushl $0f # -->format
+ pushl $strL # buf len
+ lea str(bp), a
+ push a # -->string buf
+ call snprintf
+ jmp 1f
+0: .asciz "%skeys/%u.key"
+1:
+ pushl $1 # SSL_FILETYPE_PEM
+ lea str(bp), a # filename string
+ push a
+ LOG 5, "SSL private key used: %s", 1
+ mov thisP(bp), b # -->this Node inst
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_use_PrivateKey_file
+ cmp $1, a
+ je 0f
+ SSLERR "hh's key file"
+# set X509 cert file name
+0: mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local TCP port #
+ mov C.csP, c
+ pushl C.cePathP(c) # -->name of SSL path
+ pushl $0f
+ pushl $strL # buf len
+ lea str(bp), a # string buf
+ push a
+ call snprintf
+ jmp 1f
+0: .asciz "%scerts/%u.pem"
+1: mov thisP(bp), b # -->this Node
+ LOG 5, "SSL certificate used: %s", 1
+ pushl $1 # SSL_FILETYPE_PEM
+ lea str(bp), a # filename string
+ push a
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_use_certificate_file
+ cmp $1, a
+ je 0f
+ SSLERR "hh's cert file"
+# set path to CA
+0: mov thisP(bp), b # -->this Node
+ mov C.csP, c
+ pushl C.caPathP(c) # -->name of SSL path
+ pushl $0f # -->CApath
+ LOG 5, "SSL: CA path: %s", 1
+ pushl $0 # -->CAfile not used
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_load_verify_locations # set default locations for trusted CA certificates
+ cmp $1, a
+ je N.initR
+ SSLERR "hh's thrusted certs path"
+0: .asciz "/home/local/etc/ssl/certs/"
+N.initR:
+ LOG 5, "initalized"
+ EPILOG
+#-----------------------------------------------
+# K I C K O F F
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ DL sci # socket info rank
+ DL digest, 24
+ EPILOC
+N.kickOff:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# load payload data
+ movl C.csP, c # -->CS
+ push C.txtP(c) # -->initial payload text
+ push C.ttl(c) # initial TTL
+# lea N.data(b), a # -->Data
+# push a
+ push N.dataP(b) # -->Data
+ call Da.load # load data container
+ call N.nextNode
+ mov a, N.next(b) # save next node#
+ sub N.first(b), a # first - next = socketinfo rank
+ mov a, sci(bp) # save socket info rank
+ lea digest(bp), a # -->digest buf
+ push a
+ push N.dataP(b) # -->Data
+ call Da.digest24
+ call Da.getDataLen
+ push N.next(b) # next node#
+ push a # data len
+ lea digest(bp), a # -->digest
+ push a
+ LOG 2, "kicker: ready to initial send %s, len=%u to node %u"
+ push N.next(b)
+ push sci(bp)
+ call N.conn # connect to next node
+ push sci(bp)
+ call N.put # send data to next node
+ EPILOG
+#-----------------------------------------------
+# M A I N L O O P : I T E R A T E O N S O C K E T I / O
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ EPILOC
+N.mainLoop:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# prepare
+ LOG 5, "preparing for I/O select..."
+ call N.clearSocketMasks # clear socket masks for select and zero nfds
+ call N.maskSsc # mask ssc for select
+# iterate while there are some sockets masked for select (nfds > 0)
+0: cmp $0, N.nfds(b) # any I/O in progress ?
+ jz 0f # no, end operations of node
+ call N.selectSocketIo # select on server side sockets and forward data
+ call N.clearSocketMasks # clear socket masks for select and zero nfds
+ call N.maskSsc # mask ssc for select
+ call N.maskSrvSockets # mask all connected server side sockets for select
+ jmp 0b
+0: EPILOG
+#-----------------------------------------------
+# S E L E C T S O C K E T I / O
+#-----------------------------------------------
+# select on masked sockets
+# upon return from select
+# accept connections to ssc
+# forward data from posted sockets
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ EPILOC
+N.selectSocketIo:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ push N.nfds(b)
+ lea N.rs(b), a
+ push (a)
+ push 4(a)
+ push N.ssc(b)
+ LOG 5, "select ssc=%u, mask=%08x %08x, nfds=%u"
+ lea N.t(b), a # -->timeval
+ movl $1, Ti.secs(a)
+ movl $0, Ti.usecs(a)
+ push a
+ lea N.es(b), a
+ push a # -->es (exceptional mask is not used yet)
+ pushl $0
+ lea N.rs(b), a # -->rs
+ push a
+ push N.nfds(b)
+ SYS select
+ lea N.rs(b), a
+ push (a)
+ push 4(a)
+ LOG 5, "return from select, mask of posted=%08x %08x"
+# check ssc for incomming connect request, accept it and forward data
+ lea N.rs(b), a # -->rs select mask
+ push a
+ pushl N.ssc(b)
+ call fd_isset
+ test a, a # ssc I/O ?
+ jz N.checkAndForw # no, check other sockets
+# prepare for accept; find free srv socket info block for accept
+ mov N.srvSideP(b), a # -->srv socket info array
+ xor c, c # offet in socket info array
+ xor d, d # socket info rank
+0: cmp $-1, So.sc(a, c) # socket allocated ?
+ je 0f # no, free socket found
+ inc d
+ lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on sockets
+ ERR "can't accept, all sockets in use"
+# free socket found, accept connection on it and forward data
+0: push d
+ LOG 4, "slot for accept=%d"
+ call N.acc
+ call N.forw
+# check all server side sockets for pending I/O and call forward on them
+N.checkAndForw:
+ xor c, c # offet into socket info array
+ lea N.rs(b), a # -->rs select mask
+ push a
+ lea -4(sp), sp # adjust stacker for iteration
+ xor c, c # zero offset
+ xor d, d # set counter
+# iterate through server side sockets and forward from posted sockets
+0: mov N.srvSideP(b), a # -->srv socket info array
+ cmp $-1, So.sc(a, c) # socket connected ?
+ je 1f # no, iterate
+ mov So.sc(a, c), a
+ mov a, (sp) # stack socket#
+# LOG 0, "checking port %u"
+ call fd_isset # socket I/O ?
+ test a, a
+ jz 1f # no, iterate
+ mov N.srvSideP(b), a
+ mov So.sc(a, c), a
+ mov d, (sp) # stack socket rank
+# LOG 0, "port posted, rank=%u"
+ call N.forw # forward data
+1: inc d
+ lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on srv side sockets
+ EPILOG
+#-----------------------------------------------
+# M A S K C O N N E C T E D S O C K E T S F O R S E L E C T
+#-----------------------------------------------
+# mask all connected server side socketS for next select
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSrvSockets:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ lea N.rs(b), a # -->rs select mask
+ push a
+ lea -4(sp), sp # adjust stacker for iteration
+ xor c, c # zero offet
+0: mov N.srvSideP(b), a # -->srv socket info array
+ cmp $-1, So.sc(a, c) # socket connected ?
+ je 1f # no, iterate
+ mov So.sc(a, c), a
+ mov a, (sp) # stack socket#
+ call N.maskSocket # mask socket for select
+1: lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on srv side sockets
+ EPILOG
+#-----------------------------------------------
+# M A S K S S C F O R S E L E C T
+#-----------------------------------------------
+# mask ssc for select until forward is disabled
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSsc:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ mov N.forwP(b), a
+ cmp $1, (a) # forwarding enabled ?
+ jne 0f # no, return
+ push N.ssc(b)
+ call N.maskSocket
+0: EPILOG
+#-----------------------------------------------
+# C L E A R S O C K E T M A S K S
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.clearSocketMasks:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ movl $0, N.nfds(b) # zero nfds
+ lea N.rs(b), a # zero rs select mask
+ push a
+ call fd_zero
+ lea N.es(b), a # zero es select masks
+ push a
+ call fd_zero
+ EPILOG
+#-----------------------------------------------
+# M A S K S O C K E T F O R S E L E C T
+#-----------------------------------------------
+ ARGS
+ DS sc # socket#
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSocket:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ lea N.rs(b), a # -->rs select mask
+ push a
+ pushl sc(bp)
+ call fd_set
+ mov sc(bp), a # socket#
+ cmp a, N.nfds(b) # nfds > sc ?
+ jg 0f # yes
+ inc a
+ mov a, N.nfds(b) # nfds = sc + 1
+0: EPILOG
+#-----------------------------------------------
+# F D S E T O P E R A T I N O S
+#-----------------------------------------------
+ PROLOC # local data frame def. is used by 4 following subrs
+ DL thisP # -->this Node
+ dL deP
+ EPILOC
+fd_zero:
+# zero FD mask
+ ARGS
+ DS fdsetP # -->FD mask
+ PROLOG
+ mov fdsetP(bp), a
+ xor c, c
+0: movl $0, (a, c, 4)
+ inc c
+ cmp $32, c # size of FD mask is 32 int (1024 bits)
+ jl 0b
+ EPILOG
+fd_set:
+# mask socket in FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ or d, (c, a) # set sc mask
+ EPILOG
+fd_clear:
+# clear socket from FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ not d
+ and d, (c, a) # clear sc mask
+ EPILOG
+fd_isset:
+# test socket bit status in FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ and (c, a), d # sc selected ?
+ mov d, a
+# mov $0, a
+# jz 0f # not selected
+# mov $1, a
+0: EPILOG_R
+fd_mask: # a: sc
+# adjust offsets into FD mask
+ xor d, d
+ movl $32, c
+ divl c # a: mask integer rank, c: bit offset
+ shll $2, a # a: mask integer offset
+ mov d, c
+ mov $1, d
+ shl %cl, d # d: "1" bit in position according to sc
+ ret # a: offset of mask integer
+#-----------------------------------------------
+# F O R W A R D D A T A T O O T H E R N O D E
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL digest, 24
+ EPILOC
+#-----------------------------------------------
+N.forw:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ push sci(bp) # socket info rank
+ call N.get
+ test a, a # data read ?
+ jz N.forwCloseSock # no, EOF, close this socket and start closing clients
+# kicker checks TTL
+ cmp $1, N.kicker(b) # kicker ?
+ jne 1f # no, continue forwarding
+
+ lea -4(sp), sp # adjust stacker
+ lea digest(bp), a
+ push a # -->digest buf
+ push N.dataP(b) # -->Data
+ call Da.ttl
+ mov a, 8(sp) # TTL from data header
+ call Da.digest24
+ mov a, 4(sp) # -->text digest
+ call Da.getPort
+ mov a, (sp) # remote listen port from data header
+ LOG 5, "received from node %u: %s, ttl=%d"
+ push N.dataP(b) # -->Data
+ call Da.dttl # decrement TTL in data
+ test a, a # TTl > 0 ?
+ jz N.forwKickerStop # no, disable forwarding in constellation
+# forward data
+1: call N.nextNode # calculate next node
+ mov a, N.next(b)
+ push a
+ LOG 4, "next node %u"
+ sub N.first(b), a # socket info rank (next - first)
+ mov a, sci(bp) # save
+# connect if not connected yet
+ movl $SocketInfoL, d
+ mul d # a: offset into socket info array
+ mov N.cliSideP(b), c # -->client side socket info array
+ cmp $-1, So.sc(c, a) # connected ?
+ jne 0f # yes
+ push N.next(b)
+ push sci(bp)
+ call N.conn
+0:
+ lea -4(sp), sp
+ push N.dataP(b) # -->Data
+ call Da.ttl
+ mov a, 4(sp) # ttl
+ call Da.getDataLen
+ mov a, (sp) # container len
+ push N.next(b) # next node#
+ LOG 5, "forwarding to %d, len=%d, ttl=%d --->"
+# pacing
+ mov C.csP, c # -->CS
+ cmp $0, C.pacing(c) # pacing active ?
+ je 0f # no
+ pushl $0
+ lea C.pace.tv_sec(c), a
+ push a
+ SYS nanosleep # pace
+0: push sci(bp)
+ call N.put
+ LOG 4, "leaving forward"
+ jmp N.forwR # exit forwarding
+N.forwKickerStop: # ttl = 0
+ lea digest(bp), a
+ push a # -->digest buf
+ push N.dataP(b) # -->Data
+ call Da.digest24
+ push a
+ LOG 1, "received after finally passing all: %s", 1
+ mov N.forwP(b), a # -->shared forw indicator
+ movl $0, (a) # disable forwarding for all nodes in constellation
+ jmp N.forwCloseCli
+N.forwCloseSock:
+ pushl deP(bp)
+ pushl $1 # indicate "server side"
+ push sci(bp) # socket info rank
+ call N.closeSocket # close this server side socket
+N.forwCloseCli:
+ call N.closeClients # launch client side closing thread
+ LOG 4, "leaving forward, closing"
+N.forwR:
+ EPILOG
+#-----------------------------------------------
+# D E T E R M I N E N E X T N O D E
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.nextNode:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ cmp $Cn.ring, N.topo(b) # ring ?
+ jne 0f # no
+# ring
+ mov N.locPort(b), a
+ inc a # next = locPort + 1
+ cmp a, N.last(b) # next > last ?
+ jnl N.nextNodeR
+ mov N.first(b), a
+ jmp N.nextNodeR
+# mash
+0: call random
+ xor d, d
+ mov thisP(bp), b
+ divl N.div(b) # 0 <= random < nodes
+ add N.first(b), a # first + random
+ cmp a, N.locPort(b)
+ je 0b # iterate until other then local node is selected
+N.nextNodeR:
+ push a
+ EPILOG_R
+
+#-----------------------------------------------
+# B I N D T O L O C A L T C P P O R T
+#-----------------------------------------------
+# returns
+# nothing
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ DL i
+ DL aiP # -->IP addrinfo
+ strL = 64
+ DL str, strL # string buf
+ EPILOC
+#-----------------------------------------------
+ .global N.bind
+N.bind:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "binding...", 0
+# prepare IP addr
+ pushl $AddrInfoL
+ call malloc # alloc addrinfo for hints
+ mov a, aiP(bp)
+ pushl $AddrInfoL
+ pushl $0
+ push a
+ call memset # clear hints
+
+ movl $2, ai_family(a) # AF_INET
+ movl $1, ai_flags(a) # AI_PASSIVE for bind
+ movl $0, ai_protocol(a) # any protocol
+ movl $1, ai_socktype(a) # SOCK_STREAM, blocking type
+
+ mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local port #
+ push $0f
+ lea str(bp), a
+ push a # -->str buff
+ call sprintf
+ jmp 1f
+0: .ascii "%d\0"
+1:
+ lea aiP(bp), a
+ push a # -->-->addrinfo for new addrinfo
+ pushl aiP(bp) # -->addrinfo hints
+ lea str(bp), a
+ push a # -->loc port # string
+ pushl $0f
+ call getaddrinfo
+ jmp 1f
+0: .ascii "localhost\0"
+# check IP addr
+1: cmp $0, a # getaddrinfo OK ?
+ jz 0f # yes
+ push a
+ call gai_strerror
+ push a
+ LOG 0, "getaddrinfo error: %s, ABEND", 1
+ push $1
+ call exit
+# log assigned IP addr
+0: pushl deP(bp)
+ mov aiP(bp), a # -->addrinfo
+ pushl ai_addrP(a) # -->sockaddr
+ lea str(bp), a
+ push a # -->string buf
+ call gai
+
+ lea str(bp), a
+ push a
+ LOG 5, "getaddrinfo OK, %s", 1
+# allocate socket
+ mov aiP(bp), a
+ pushl ai_protocol(a)
+ pushl ai_socktype(a)
+ pushl ai_family(a)
+ SYS socket
+ mov a, N.ssc(b) # save ssc
+# set socket option SO_REUSEADDR
+ pushl $4 # integer width
+ lea 9f, a # value 1
+ push a
+ pushl $2 # SO_REUSEADDR
+ pushl $1 # SOL_SOCKET
+ push N.ssc(b) # ssc
+ SYS setsockopt
+ jmp 8f
+9: .long 1
+8:
+# bind
+ mov aiP(bp), a
+ pushl ai_addrlen(a)
+ pushl ai_addrP(a)
+ push N.ssc(b) # ssc
+ SYS bind
+# listen
+ pushl $1 # pend conns queue len
+ pushl N.ssc(b) # ssc
+ SYS listen
+ pushl N.locPort(b)
+ LOG 2, "bound to %d", 1
+N.bindR:
+ EPILOG
+#-----------------------------------------------
+# C O N N E C T T O R E M O T E N O D E
+#-----------------------------------------------
+# args
+ ac = 40
+ DS i # socket rank
+ DS remPort # TCP port# on remote site
+# returns
+# nothing
+# local vars
+ ac = 0
+ DL debug, DebugL # Debug inst
+ DL deP # -->Debug inst
+ DL retry # retry counter
+ DL ai, AddrInfoL # IP addrinfo
+ DL aiP # -->IP addrinfo
+ DL str, 64 # string buf
+ DL currSockP # -->SocketInfo save area
+ locL = ac
+#-----------------------------------------------
+ .global N.conn
+N.conn:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea debug(bp), a # -->own local debug block
+ mov a, deP(bp) # save -->local Debug
+ pushl remPort(bp) # remote port
+ lea N.debug(b), d # -->node's Debug
+ lea D.id(d), a # -->node's debug ID
+ push a
+ DEBID "%s to %u", 2 # set local debug ID
+ LOG 3, "connecting...", 0
+
+ mov N.cliSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull i(bp) # SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+ mov remPort(bp), a
+ mov a, So.remPort(c) # put remote port # into curr SocketInfo
+# prepare IP addrinfo hints
+ pushl $AddrInfoL # IP addrinfo length
+ pushl $0
+ lea ai(bp), a
+ mov a, aiP(bp) # -->IP addrinfo
+ push a
+ call memset # clear addrinfo
+ movl $2, ai_family(a) # AF_INET
+ movl $0, ai_flags(a)
+ movl $0, ai_protocol(a) # any protocol
+ movl $1, ai_socktype(a) # SOCK_STREAM
+# set up port # string
+ mov currSockP(bp), a # -->curr socket info
+ pushl So.remPort(a) # remote port#
+ pushl $0f
+ lea str(bp), a
+ push a # -->port# string
+ call sprintf
+ jmp 1f
+0: .ascii "%u\0"
+# get IP addrinfo block chain
+1: lea aiP(bp), a # -->-->IP addrinfo
+ push a
+ pushl aiP(bp) # -->IP addrinfo hints
+ lea str(bp), a # -->remote port # string
+ push a
+ pushl $0f
+ call getaddrinfo
+ jmp 1f
+0: .ascii "localhost\0"
+# check IP addr
+1: cmp $0, a # getaddrinfo OK ?
+ jz 0f # yes
+ push a
+ call gai_strerror
+ push a
+ LOG 0, "getaddrinfo error: %s, ABEND", 1
+ push $1
+ call exit # ABEND
+# log assigned IP addr
+0: pushl deP(bp) # -->Debug inst
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_addrP(a) # -->sockaddr
+ lea str(bp), a
+ push a # -->string buf
+ call gai
+ LOG 5, "getaddrinfo OK, %s", 1
+# allocate comm socket
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_protocol(a)
+ pushl ai_socktype(a)
+ pushl ai_family(a)
+# call socket # allocate comm socket
+# cmp $0, a
+# jg 0f
+# SYSERR "socket alloc"
+#0:
+ SYS socket
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sc(c) # put new socket # into SocketInfo
+# connect to remote partner server side
+ mov C.csP, a
+ mov C.connTh(a), a # conn retry threshold
+ mov a, retry(bp) # initilize retry counter
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_addrlen(a)
+ pushl ai_addrP(a)
+ pushl So.sc(c) # socket #
+2: call connect
+ cmp $0, a
+ jnl 0f # connected
+ call __errno_location # get --> sys errno
+ cmp $111, (a) # ECONNREFUSED ?
+ je 1f # yes, wait to retry
+ SYSERR "connect" # no, ABEND
+1: pushl $22000 # 22 msecs
+ call usleep
+ lea 4(sp), sp
+ sub $1, retry(bp)
+ jnz 2b # iterate
+# conn retry threshold reached
+ mov C.csP, a
+ pushl C.connTh(a)
+ LOG 0, "connection refused, threshold %d reached", 1
+ pushl $1
+ call exit # ABEND
+0:
+# connected, account new connection
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ lea S.counter_sem(a), a # -->overall counter semaphore
+ push a
+ SYS sem_wait
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ incl S.conns(a) # incr conns counter
+ lea S.counter_sem(a), a # -->overall msg counter semaphore
+ push a
+ SYS sem_post
+# prepare SSL
+ cmp $1, N.ssl(b) # node's ssl switch
+ jne N.connRet # no SSL
+
+ LOG 5, "prepare for SSL"
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_new # create a SSL structure
+
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sslP(c) # put -->SSL struct into SocketInfo
+ test a, a
+ jnz 0f
+ SSLERR "new SSL"
+# connect on SSL
+0: call ERR_clear_error
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_set_fd # connect the SSL object with a file descriptor
+ test a, a
+ jnz 0f
+ SSLERR "client SSL set fd"
+0:
+ LOG 5, "SSL fd set"
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_connect
+ test a, a # connected ?
+ jns 1f # yes
+ push a
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_get_error # SSL_get_error(-->ssl, err)
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 0f # no, other SSL err
+ SYSERR "SSL connect"
+0: SSLERR "SSL connect"
+1:
+ mov C.csP, a
+ mov C.connTh(a), a
+ sub retry(bp), a # # of retries = conn threshold - counter
+ push a
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket#
+ LOG 2, "connected via sc=%d after %d retries", 2
+N.connRet:
+ EPILOG
+#-----------------------------------------------
+# A C C E P T C O N N E C T I O N
+#-----------------------------------------------
+ ARGS
+ DS i # socket rank
+# returns
+# nothing
+# local vars
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL sa, SockAddrL # sockaddr
+ DL str, 64 # string buf
+ DL currSockP # -->curr SocketInfo save area
+ EPILOC
+#-----------------------------------------------
+ .global N.acc
+N.acc:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "accepting..."
+
+ mov N.srvSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull i(bp) # curr SocketInfo offset
+ lea (c, a), a # -->curr SocketInfo
+ mov a, currSockP(bp) # save -->curr SocketInfo
+# accept
+ pushl $0
+ pushl $0
+# mov thisP, b # -->Node inst
+ pushl N.ssc(b) # listen socket
+# call accept
+# cmp $0, a
+# jnl 0f
+# SYSERR "accept"
+#0:
+ SYS accept
+ mov currSockP(bp), c # -->Node inst
+ mov a, So.sc(c) # save comm socket of accepted connection
+ pushl $0f # -->sa length
+ lea sa(bp), a # -->sockaddr
+ push a
+ pushl So.sc(c) # comm socket
+ call getpeername
+ jmp 1f
+0: .int SockAddrL
+1: pushl deP(bp)
+ lea sa(bp), a # -->sockaddr
+ push a
+ lea str(bp), a # -->string buf
+ push a
+ call gai # get string of addr/port
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # comm socket
+ LOG 3, "peer: sc=%d, %s", 2
+# prepare SSL
+ mov thisP(bp), b # -->this Node
+ cmp $1, N.ssl(b) # node's ssl switch
+ jne 1f # no SSL
+
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_new # create a SSL structure
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sslP(c) # put -->SSL struct into SocketInfo
+ test a, a
+ jnz 0f
+ SSLERR "new SSL"
+# accept on SSL
+0: call ERR_clear_error
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket #
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_set_fd # connect the SSL object with a file descriptor
+ test a, a
+ jnz 0f
+ SSLERR "client SSL set fd"
+0:
+ LOG 5, "SSL fd set"
+ call SSL_accept
+ test a, a
+ jns 1f
+ mov a, 4(sp)
+ call SSL_get_error # SSL_get_error(-->ssl, err)
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 0f # no, other SSL err
+ SYSERR "SSL accept"
+0: SSLERR "SSL accept"
+1:
+ LOG 2, "accepted"
+ EPILOG
+#-----------------------------------------------
+# R E A D F R O M C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns length read
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL shP # -->shared mem
+ DL currSockP # -->curr SocketInfo
+ DL n # len read
+ DL rest # len to be read
+ DL buf # read dest
+ EPILOC
+#-----------------------------------------------
+ .global N.read
+N.read:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ mov C.csP, a # -->CS
+ mov C.shP(a), a # -->shared mem
+ mov a, shP(bp) # save -->shared mem
+ push N.dataP(b)
+ call Da.getDataLen
+ mov a, rest(bp) # init len to be read
+ call Da.getContP
+ mov a, buf(bp) # init read dest
+ mov N.srvSideP(b), c # -->server side SocketInfo array
+ mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+
+ push So.sc(c) # comm socket
+ push rest(bp) # len to be read
+ LOG 5, "ready to read len=%d from sc=%d..."
+# iterate on reading until whole length is read
+N.readIterate:
+ cmp $0, rest(bp) # something to be read ?
+ jng N.readAcc # no, end reading
+ push rest(bp) # len to be read
+ push buf(bp) # read dest
+ mov currSockP(bp), c # curr SocketInfo
+ cmp $1, N.ssl(b) # SSL ?
+ jne 0f # no
+# read w/ SSL
+ push So.sslP(c) # -->SSL struct
+ SYS SSL_read
+ jmp 1f
+# read w/o SSL
+0:
+ push So.sc(c) # socket #
+ SYS read
+# adjust read controls
+1:
+ jz N.readEof # EOF read
+ add a, buf(bp) # adjust -->read dest
+ sub a, rest(bp) # decrement rest to read
+ cmp $0, rest(bp) # everything read ?
+ jng 0f
+ push a
+ LOG 5, "partly read %d bytes"
+0: jmp N.readIterate
+# account data just read
+N.readAcc:
+ mov shP(bp), a # -->shared mem
+ lea S.counter_sem(a), a # overall msg counter semaphore
+ push a
+ SYS sem_wait # get semaphore
+ mov shP(bp), a
+ incl S.msgs(a) # incr msgs counter
+ lea S.counter_sem(a), a # overall msg counter semaphore
+ push a
+ SYS sem_post # post semaphore
+
+ lea -4(sp), sp # adjust stacker
+ push N.dataP(b) # -->Data
+ call Da.getPort
+ mov a, 4(sp) # remote listen port
+ call Da.getDataLen
+ mov a, (sp) # data length
+ LOG 5, "%d read from node %u"
+ mov (sp), a # return len read
+ jmp N.readRet
+# EOF read, finish
+N.readEof:
+ LOG 4, "EOF read"
+N.readRet:
+ EPILOG_R
+#-----------------------------------------------
+# W R I T E T O C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns length written
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL currSockP # -->curr SocketInfo
+ DL n # len written
+ DL rest # len to be written
+ DL buf # write orig
+ EPILOC
+#-----------------------------------------------
+ .global N.write
+N.write:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# init control values
+ mov C.csP, a # -->CS
+ mov C.shP(a), a # -->shared mem
+ mov a, shP(bp) # save -->shared mem
+ push N.dataP(b) # -->Data
+ call Da.getDataLen
+ mov a, rest(bp) # init len to be written
+ call Da.getContP
+ mov a, buf(bp) # init write orig
+ mov N.cliSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+
+ push So.sc(c) # comm socket
+ push rest(bp) # len to be written
+ LOG 5, "ready to write len=%d to sc=%d..."
+# iterate on writing until whole length is written
+N.writeIterate:
+ cmp $0, rest(bp) # something to be written ?
+ jng N.writeEnd # no, end writing
+ push rest(bp) # len to be written
+ push buf(bp) # write orig
+ cmp $1, N.ssl(b) # SSL ?
+ jne 0f # no
+# SSL
+ pushl So.sslP(c) # -->SSL struct
+ SYS SSL_write
+ jmp 1f
+# no SSL
+0:
+ pushl So.sc(c) # socket
+ SYS write
+# adjust write controls
+1:
+ mov a, n(bp) # save len written
+ add a, buf(bp) # adjust write orig
+ sub a, rest(bp) # decrement rest to write
+ cmp $0, rest(bp) # everything written ?
+ jng 0f
+ push a
+ LOG 5, "partly written %d bytes"
+0: jmp N.writeIterate
+# return datalen
+N.writeEnd:
+ mov N.dataP(b), a
+ mov Da.datalen(a), a
+ push a
+ LOG 5, "%d written", 1
+ EPILOG_R
+#-----------------------------------------------
+# G E T D A T A F R O M C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns boolean: 0 - EOF read, 1 - data read
+ PROLOC
+ DL thisP # -->this Node
+ DL deP
+ EPILOC
+#-----------------------------------------------
+ .global N.get
+N.get:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ push sci(bp) # SocketInfo rank
+ LOG 5, "get data from socket, rank=%u"
+ call N.read
+ test a, a
+ jz 0f
+ mov $1, a
+0: EPILOG_R
+#-----------------------------------------------
+# P U T D A T A T O C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns boolean: 0 - nothing written, 1 - data written
+ PROLOC
+ DL thisP # -->this Node
+ DL deP
+ EPILOC
+#-----------------------------------------------
+ .global N.put
+N.put:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 5, "put data to socket"
+
+ mov N.dataP(b), d # -->Data
+ mov Da.contP(d), a # -->container
+ lea Co.hdr(a), a # -->header
+ mov N.locPort(b), c
+ mov c, H.lport(a) # listen port to header
+
+ pushl sci(bp) # SocketInfo rank
+ call N.write
+ test a, a
+ jz 0f
+ mov $1, a
+0: EPILOG_R
+#-----------------------------------------------
+# C L O S E C O N N E C T I O N S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+ DS server # indicate server or client
+ DS deP # -->Debug inst
+# returns:# nothing
+ PROLOC
+ DL thisP # -->this Node
+ DL currSockP # -->curr SocketInfo
+ DL tagP
+ DL e
+ EPILOC
+N.srvSideTag: .asciz "server side"
+N.cliSideTag: .asciz "client side"
+#-----------------------------------------------
+ .global N.closeSocket
+N.closeSocket:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ mov N.srvSideP(b), c # -->SocketInfo array
+ movl $N.srvSideTag, tagP(bp)
+ cmp $1, server(bp) # server side ?
+ je 0f
+ mov N.cliSideP(b), c # client side
+ movl $N.cliSideTag, tagP(bp)
+0: mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c
+ mov c, currSockP(bp) # save -->curr SocketInfo
+ push So.sc(c) # socket to close
+ push tagP(bp)
+ LOG 5, "closing %s sc=%u...", 2
+
+ cmp $0, N.ssl(b) # SSL ?
+ je 2f # no
+# SSL shutdown
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_shutdown
+ cmp $0, a
+ jns 0f # no err
+ SSLERR "SSL shutdown (1)"
+0: jnz 2f # SSL shutdown finished
+ LOG 5, "SSL shutdown rc=0, retry"
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_shutdown
+ cmp $0, a
+ jg 2f # SSL shutdown finished
+ push a # SSL err
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_get_error
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 1f # no, other SSL err
+# syscall err
+ call ERR_get_error
+ test a, a # SSL err ?
+ jnz 2f # yes, don't care
+ call __errno_location # sys err ?
+ test a, a
+ jz 2f # no
+ SYSERR "SSL shutdown (2)"
+# other SSL err, print SSL err queue
+1: SSLERR "SSL shutdown (2)"
+# close socket
+2: mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket to close
+# call close
+# test a, a
+# jz 0f
+# SYSERR "close"
+#0:
+ SYS close
+ push tagP(bp)
+ LOG 4, "%s sc=%u closed", 2
+ mov currSockP(bp), c # -->curr SocketInfo
+ movl $-1, So.sc(c) # indicate closed socket
+ EPILOG
+#-----------------------------------------------
+# C R E A T E T H R E A D T O C L O S E C L I E N T S I D E S
+#-----------------------------------------------
+ ARGS
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL pt # pthread_t (int)
+ EPILOC
+#-----------------------------------------------
+N.closeClients:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ cmp $1, N.closing(b)
+ je 0f # already closing, nothing to do
+ LOG 5, "spawning thread to close client side"
+ push b # -->this (param for thread)
+ pushl $N.closeCliThread # -->thread func
+ pushl $0
+ lea N.ptid(b), a # -->thread ID
+ push a
+ call pthread_create
+ test a, a
+ jz 1f
+ SYSERR "create thread"
+1: mov thisP(bp), b # -->this Node
+ movl $1, N.closing(b) # indicate "closing"
+ push N.ptid(b)
+ LOG 5, "closing thread (%u) spawned"
+0: EPILOG
+#-----------------------------------------------
+# C L O S E C L I E N T S I D E S
+#-----------------------------------------------
+ ARGS
+ DS thisP
+ PROLOC
+ DL deP # -->Debug inst
+ DL debug, DebugL
+ EPILOC
+#-----------------------------------------------
+N.closeCliThread:
+ PROLOG
+ mov thisP(bp), b # -->this Node
+ lea debug(bp), a
+ mov a, deP(bp) # save deP
+ lea N.debug(b), a
+ lea D.id(a), a # -->caller debug id
+ push a
+ DEBID "%s CLOSE clients", 1
+ LOG 5, "start..."
+ mov $0, c # offet in socket info array
+ mov $0, d # socket info rank
+ mov N.cliSideP(b), a # -->client socket info array
+0: # iterate on opened sockets
+ cmp $-1, So.sc(a, c) # sc allocated ?
+ je 1f # no, iterate
+ push So.sc(a, c) # socket#
+ LOG 4, "sc=%u"
+ push deP(bp)
+ pushl $0 # indicate "client"
+ push d # socket info rank
+ call N.closeSocket # close socket
+1: inc d
+ cmp N.nodes(b), d # rank < nodes ?
+ je 0f # no, end
+ lea SocketInfoL(c), c # incr offset
+ jmp 0b # iterate
+0: LOG 4, "finished, exiting thread"
+ pushl $0
+ call pthread_exit
+ EPILOG
+#-----------------------------------------------
+# S Y N C H R O N I Z E A L L N O D E S
+#-----------------------------------------------
+ ARGS
+ PROLOC
+ DL thisP
+ DL deP
+ DL shP
+ DL tosignal
+ DL sigmask, 128
+ DL sigact, SigActionL
+ EPILOC
+N.sync:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save -->Debug locally for LOG macro
+
+ movl $0, tosignal(bp)
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ push S.act(a)
+ mov a, shP(bp)
+ push N.pid(b)
+ LOG 5, "%u synchronizing: active=%u"
+# set signal handler block
+ lea sigact(bp), a
+ movl $N.sighandle, sa_handler(a)
+ movl $0, sa_flags(a)
+ lea sa_mask(a), a # set handler mask
+ push a
+ SYS sigfillset
+ jmp 0f
+N.sighandle:
+ ret
+0:
+# activate signal handler
+ lea sigmask(bp), a
+ push a
+ SYS sigemptyset
+ push $SIGUSR2
+ lea sigmask(bp), a
+ push a
+ SYS sigaddset
+ push $0
+ lea sigmask(bp), a
+ push a
+ push $SIG_UNBLOCK
+ SYS sigprocmask
+ push $0
+ lea sigact(bp), a
+ push a
+ push $SIGUSR2
+ SYS sigaction
+# update nodes counter
+ mov shP(bp), a
+ lea S.counter_sem(a), a
+ push a
+ SYS sem_wait # get semaphore
+ mov shP(bp), a
+ decl S.act(a) # decrement nodes counter
+ setz tosignal(bp)
+ mov shP(bp), a
+ lea S.counter_sem(a), a
+ push a
+ SYS sem_post # post semaphore
+ testl $1, tosignal(bp) # all finished?
+ jz N.sigwait
+# signal all tasks
+ push N.pid(b)
+ LOG 5, "%u signaling USR2"
+ push $SIGUSR2
+ push $0
+ SYS kill
+ jmp N.woken
+# wait for signal
+N.sigwait:
+# call pause
+ push $5
+ call sleep
+N.woken:
+ mov thisP(bp), b # -->this Node
+ push N.pid(b)
+ LOG 5, "%u woken up"
+ EPILOG
+#-----------------------------------------------
+# F O R M A T A N D P R I N T S S L E R R O R
+#-----------------------------------------------
+# args
+ ac = 8
+ DS deP # -->Debug
+ DS msgP # -->accompanying msg str
+# returns: nothing
+ PROLOC
+ DL str, 256 # generated string buf
+ EPILOC
+#-----------------------------------------------
+ .global N.sslErr
+N.sslErr:
+ enter $locL, $0
+0:
+ call ERR_get_error
+ push a
+ LOG 5, "ssl err, e=%lu", 1
+ test a, a # end of msg queue ?
+ jz 0f # yes
+ lea str(bp), d
+ push d # -->err msg str buf
+ push a # err
+ call ERR_error_string
+ push a # -->err msg str
+ pushl msgP(bp) # -->accompanying msg str
+ LOG 0, "%s: %s", 2
+ jmp 0b # iterate on SSL err msg queue
+0:
+ pushl $1
+ call exit # ABEND
+#-----------------------------------------------
+# G E T A S S I G N E D I P A D D R
+#-----------------------------------------------
+# puts assigned IP address:port in string
+ ARGS
+ DS sP # -->output string
+ DS saP # -->sockaddr
+ DS deP # -->Debug
+# returns: nothing
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+gai: PROLOG
+
+ mov saP(bp), b # -->sockaddr
+ xor a, a
+ mov sa_data(b), %ax # port # in big endian order
+ push a
+ call ntohs
+ mov a, d
+
+ mov saP(bp), b # -->sockaddr
+ pushl sa_data+2(b) # IP addr in big endian order
+ call ntohl
+
+ push d # port # as int
+
+ mov $4, c # iterate on 4 bytes of IP addr
+0: mov a, d
+ and $0xff, d
+ push d # part of IP addr
+ shr $8, a
+ dec c
+ jnz 0b # iterate
+
+ push $0f
+ pushl sP(bp) # -->output buf
+ call sprintf
+ jmp 1f
+0: .ascii "%u.%u.%u.%u:%d\0"
+1:
+ EPILOG
+#-----------------------------------------------
+ .end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/CS.c Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,158 @@
+#include "CS.h"
+
+CSP csP;
+DebugP deP;
+
+void abend(DebugP deP) {
+// LOG(0, "ABORT, netstat:");
+// int pid;
+// if(!(pid = fork())) {
+// execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
+// exit(EXIT_SUCCESS);
+// }
+// waitpid(pid, NULL, 0);
+ LOG(0, "ABORT, backtrace:");
+ back_trace();
+ kill(0, SIGTERM);
+ exit(EXIT_FAILURE);
+}
+char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket
+ char *s = malloc(64);
+ unsigned short port = *(unsigned short*) ai_addr->sa_data;
+ char *a = ai_addr->sa_data + 2;
+ sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
+ return s;
+}
+void gai(int level, struct addrinfo *ai, DebugP pr) { // logs assigned IP4 addresses from addrinfo chain
+ struct addrinfo *sa = ai;
+ if (csP->debMaxLev >= level) do {
+ char *s = gpa(sa->ai_addr);
+ strcpy(pr->msg, s); free(s);
+ deb(level, pr);
+ } while ((sa = sa->ai_next));
+ fflush(stderr);
+}
+void ssl_err(DebugP pr, char *s) {
+ long e;
+ e = ERR_get_error();
+ LOG(4, "ssl err, e=%lu", e);
+ while(e) {
+ LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
+ e = ERR_get_error();
+ LOG(4, "ssl err, e=%lu", e);
+ }
+ abend(pr);
+}
+static int getArg(char *a) {
+ return (getenv(a) != NULL) ? atoi(getenv(a)) : 0;
+}
+static void constellation(topology topo, int ssl) {
+ DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
+ int first, nodes, *forw, pid, stat, exitRc = EXIT_SUCCESS;
+ pid_t *pids;
+ if(topo==ring) { first=csP->rp0; nodes=csP->rn; }
+ if(topo==mash) { first=csP->mp0; nodes=csP->mn; }
+ if(nodes == 0) exit(0);
+ if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
+ LOG(1, "%d node(s) starting...", nodes);
+ pids = malloc(nodes*sizeof(int));
+ first += ssl*500;
+ if((forw = mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
+ *forw = 1;
+ for(int i = 0; i < nodes; i++) {
+ if(!(pid = fork())) Node(topo, forw, first+i, first, nodes, ssl);
+ else { LOG(3, "node %u established in process %u", first+i, pid); pids[i] = (pid_t)pid; }
+ }
+ LOG(2, "all nodes established");
+ while((pid = wait(&stat)) > 0) {
+ if(WIFEXITED(stat)) {
+ LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
+ if(WEXITSTATUS(stat) != EXIT_SUCCESS) exitRc = EXIT_FAILURE;
+ }
+ else { LOG(4, "node process %u crashed", pid); exitRc = EXIT_FAILURE; }
+ }
+ LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "BADLY");
+ exit(exitRc);
+}
+int main(int argc, char *argv[]) {
+ CST cs;
+ csP = &cs;
+ deP = &(csP->debug);
+ ShareA(csP->shP);
+ struct sigaction ign;
+
+ if(sem_init(&(csP->shP->debugSem), 1, 1) < 0) ERR("LOG sem_init");
+ debug_init(argv[0]);
+ DEBID("client/server demo");
+ if(getArg("DEB") >= 0) csP->debMaxLev = getArg("DEB");
+
+ if(sem_init(&csP->shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
+ ign.sa_flags = 0;
+ ign.sa_handler = SIG_IGN;
+ if(sigaction(SIGUSR2, &ign, NULL)<0) SYSERR("sigaction");
+
+ csP->shP->msgs = 0;
+ csP->shP->conns = 0;
+ csP->text="bla bla";
+ csP->ttl = 3;
+ csP->rp0 = 11000;
+ csP->mp0 = 12000;
+ csP->rn = 0;
+ csP->mn = 0;
+ csP->pace.tv_sec = csP->pace.tv_nsec = 0;
+ csP->connThreshold = 77; // connection retries threshold
+ csP->connTO = 0.01 * 1000*1000; // connection timeout in usecs
+ csP->selTO = 1; // selection timeout in secs
+ csP->issl = 0;
+ csP->caP = (char*)"/home/local/etc/ssl/certs/";
+ char *sslPathSuffP = "/../CS/";
+
+ if(getenv("T") != NULL) csP->text = getenv("T");
+ if(getenv("CEP") != NULL) {
+ csP->ceP = (char*)malloc(strlen(getenv("CEP")));
+ strcpy(csP->ceP, getenv("CEP")); }
+ else {csP->ceP = (char*)malloc(strlen(dirname(argv[0])) + strlen(sslPathSuffP) + 1);
+ strcpy(csP->ceP, dirname(argv[0]));
+ strcpy(csP->ceP + strlen(dirname(argv[0])), sslPathSuffP); }
+ if(getenv("CAP") != NULL) {
+ csP->caP = (char*)malloc(strlen(getenv("CAP")) + 1);
+ strcpy(csP->caP, getenv("CAP")); }
+ if(getArg("TTL") > 0) csP->ttl = getArg("TTL");
+ if(getArg("RP0") > 0) csP->rp0 = getArg("RP0");
+ if(getArg("MP0") > 0) csP->mp0 = getArg("MP0");
+ if(getArg("N") >= 0) {
+ csP->mn = getArg("N");
+ csP->rn = csP->mn; }
+ if(getArg("SSL") >= 0) csP->issl = getArg("SSL");
+ if(getArg("RN") >= 0) csP->rn = getArg("RN");
+ if(getArg("MN") >= 0) csP->mn = getArg("MN");
+ csP->shP->act = csP->rn + csP->mn; // initialize active node processes counter
+ if(csP->issl > 1) csP->shP->act += csP->shP->act;
+ csP->pacing = 0;
+ if(getenv("P") != NULL) {
+ double d = atof(getenv("P"));
+ csP->pace.tv_sec=(time_t)trunc(d);
+ csP->pace.tv_nsec=(d-csP->pace.tv_sec)*1000*1000*1000;
+ if(csP->pace.tv_sec > 0 || csP->pace.tv_nsec > 0) csP->pacing = 1;
+ }
+ if(getArg("RS") >= 0) srandom(getArg("RS"));
+// LOG(5, "initialized: ssl=%u, ssl path: %s, CA path: %s, pace=%f",
+// csP->issl, csP->ceP, csP->caP, (double)csP->pace.tv_sec+(double)csP->pace.tv_nsec/(1000*1000*1000));
+ LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
+ argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, csP->debMaxLev);
+ if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->ceP, csP->caP);
+
+ if(csP->issl < 2) { // 0 = no ssl, 1 = with SSL, 2 = both
+ if(!fork()) constellation(ring, csP->issl);
+ if(!fork()) constellation(mash, csP->issl);
+ } else for(int i = 0; i < 2; i++) {
+ if(!fork()) constellation(ring, i);
+ if(!fork()) constellation(mash, i);
+ }
+
+ int stat, pid;
+ while((pid = wait(&stat)) > 0) if(WIFEXITED(stat)) LOG(4, "constellation process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
+
+ LOG(1, "final balance: forwards=%d, connections=%d", csP->shP->msgs, csP->shP->conns);
+ return 0;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/CS.h Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,155 @@
+#ifndef _CSH_
+#define _CSH_ 1
+
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include <math.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/time.h>
+#include <openssl/ssl.h>
+#include <openssl/bio.h>
+#include <openssl/err.h>
+#include <sys/wait.h>
+#include <sys/select.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <signal.h>
+#include <execinfo.h>
+#include <libgen.h>
+
+typedef struct DebugS {
+ char id[128];
+ char msg[256];
+} DebugT, *DebugP;
+#define DebugA() (DebugP)malloc(sizeof(DebugT))
+
+typedef struct ShareS {
+ int conns;
+ int msgs;
+ int act;
+ int mashOpenClientCount;
+ int mashOpenSSLClientCount;
+ sem_t counterSem;
+ sem_t debugSem;
+} ShareT, *ShareP;
+#define ShareA(v) if((v = mmap(NULL, sizeof(ShareT), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap")
+
+typedef struct CSS {
+ DebugT debug;
+ int debMaxLev;
+ int port0;
+ int portZ;
+ char *text;
+ int ttl;
+ int mp0;
+ int mn;
+ int rp0;
+ int rn;
+ struct timespec pace;
+ int pacing;
+ int issl;
+ int connThreshold; // connection retry threshhold
+ int connTO; // connection timeout in usecs
+ int selTO; // selection timeout in secs
+ char *ceP; // -->SSL certs path
+ char *caP; // -->CA certs path
+ ShareP shP;
+ char home[128];
+} CST, *CSP;
+#define CSA(v) if((v = (CSP)malloc(sizeof(CST))) < 0) SYSERR("malloc")
+
+typedef struct HeaderS {
+ int ttl;
+ int ts;
+ int listPort;
+} HeaderT, *HeaderP;
+typedef struct PayloadS {
+ time_t ts;
+ char text;
+} PayloadT, *PayloadP;
+typedef struct ContainerS {
+ HeaderT hdr;
+ PayloadT payl;
+} ContainerT, *ContainerP;
+typedef struct DataS {
+ DebugT debug;
+ ContainerP contP;
+ int dataLen;
+} DataT, *DataP;
+#define DataA(v) if((v = (DataP)malloc(sizeof(DataT))) < 0) SYSERR("malloc")
+
+typedef struct SocketS {
+ int remPort;
+ int sc;
+ SSL *sslP;
+} SocketT, *SocketP;
+#define SocketA(v) if((v = (SocketP)malloc(sizeof(SocketT))) < 0) SYSERR("malloc")
+
+typedef enum{ring, mash} topology;
+typedef struct NodeS {
+ DebugT debug;
+ topology topo;
+ int ssc;
+ int locPort;
+ int first;
+ int last;
+ int nodes;
+ int nodeIdx;
+ int kicker;
+ int *forw;
+ int closing;
+ pthread_t closingThread;
+ SocketP cliSides;
+ SocketP srvSides;
+ DataT data;
+ int len;
+ int ssl;
+ SSL_CTX *ctx;
+ struct sigaction *sigactP;
+} NodeT, *NodeP;
+#define NodeA(v) if((v = (NodeP)malloc(sizeof(NodeT))) < 0) SYSERR("malloc")
+typedef enum{client, server} nodeside;
+
+extern void debug_init();
+extern char *gpa(struct sockaddr *);
+extern void gai(int, struct addrinfo *ai, DebugP);
+#define GAI(level, ai) gai(level, ai, deP)
+extern void ssl_err(DebugP, char*);
+extern void abend(DebugP);
+extern void Node(topology, int*, int, int, int, int);
+extern void Data(DataP, DebugP);
+extern int dttlData(DataP);
+extern int ttlData(DataP);
+extern int remPortData(DataP);
+extern DataP loadData(DataP, int, char*);
+extern char *unldData(DataP);
+extern char *digest24Data(DataP, char *);
+extern int chkData(DataP);
+extern void sabotageData(DataP);
+extern int tsData(DataP);
+extern void back_trace();
+extern void pre(char *);
+extern void deb(int, DebugP);
+extern void err(int, char *, DebugP);
+
+#define DEBID(...) sprintf(deP->id, __VA_ARGS__)
+#define LOG(level, ...) sprintf(deP->msg, __VA_ARGS__), deb(level, deP)
+#define SYSERR(e) LOG(0, "%s: %s (%d)", e, strerror(errno), errno), abend(deP)
+#define SSLERR(e) ssl_err(deP, e)
+#define SOFTERR(e) LOG(0, e), fflush(stderr)
+#define HARDERR(e) SOFTERR(e), abend(deP);
+#define ERR(e) perror(e), exit(EXIT_FAILURE)
+
+#define PACING SIGUSR1
+
+extern CSP csP;
+
+#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/Data.c Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,47 @@
+#include "CS.h"
+
+void Data(DataP thisP, DebugP callerDeP) {
+ DebugP deP = &(thisP->debug);
+ DEBID("%s DATA", callerDeP->id);
+ thisP->dataLen = sizeof(HeaderT) + sizeof(PayloadT) + strlen(csP->text);
+ thisP->contP = (ContainerP)malloc(thisP->dataLen);
+ thisP->contP->hdr.ttl = 0;
+ thisP->contP->hdr.ts = 0;
+ strcpy(&(thisP->contP->payl.text), "EMPTY");
+ LOG(5, "Data instance established");
+}
+int dttlData(DataP thisP) {
+ thisP->contP->hdr.ttl--;
+ return thisP->contP->hdr.ttl;
+}
+int ttlData(DataP thisP) {
+ return thisP->contP->hdr.ttl;
+}
+int remPortData(DataP thisP) {
+ return thisP->contP->hdr.listPort;
+}
+DataP loadData(DataP thisP, int ttl, char *loadP) {
+ DebugP deP = &(thisP->debug);
+ thisP->contP->hdr.ttl = ttl;
+ thisP->contP->payl.ts = time(NULL);
+ strcpy(&(thisP->contP->payl.text), csP->text);
+ LOG(5, "payload loaded to container");
+ return thisP;
+}
+char *unldData(DataP thisP) {
+ return &(thisP->contP->payl.text);
+}
+char *digest24Data(DataP thisP, char *digest) {
+ char *text = &(thisP->contP->payl.text);
+ if(strlen(text) < 24) strcpy(digest, text);
+ else { strncpy(digest, text, 8); strcpy(digest+8, "-------"); strcpy(digest+15, text+strlen(text)-8); }
+ return digest;
+}
+void sabotageData(DataP thisP) {
+ thisP->contP->payl.text = '?'; }
+int chkData(DataP thisP) {
+ return (int)(strcmp(csP->text, &(thisP->contP->payl.text)) == 0);
+}
+int tsData(DataP thisP) {
+ return thisP->contP->hdr.ts;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/Debug.c Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,68 @@
+#include "CS.h"
+
+char *prg_name;
+struct timeval t0;
+
+void debug_init(char *prgname) {
+ prg_name = prgname;
+ gettimeofday(&t0, NULL);
+}
+void pre(char *o) {
+ fprintf(stderr, "%s\n", o);
+ fflush(stderr);
+}
+void deb(int level, DebugP deP) {
+ if ((csP->debMaxLev >= level && csP->debMaxLev != 7) || (csP->debMaxLev == 7 && level == 7)) {
+ if(sem_wait(&(csP->shP->debugSem))< 0) ERR("LOG sem_wait");
+ struct timeval t;
+ gettimeofday(&t, NULL);
+ fprintf(stderr, "%03ld%06d %s: %s\n", (long)t.tv_sec-t0.tv_sec, (int)t.tv_usec, deP->id, deP->msg);
+ fflush(stderr);
+ if(sem_post(&(csP->shP->debugSem)) < 0) ERR("LOG sem_post");
+ }
+}
+void err(int level, char *o, DebugP deP) {
+ fprintf(stderr, "%s: %s\n", deP->id, o);
+ fflush(stderr);
+}
+void back_trace() {
+ if(sem_wait(&(csP->shP->debugSem))< 0) ERR("LOG sem_wait");
+
+ int j, nptrs;
+ void *buffer[24];
+ char **strings;
+
+ nptrs = backtrace(buffer, 24);
+
+ strings = backtrace_symbols(buffer, nptrs);
+ if (strings == NULL) {
+ ERR("ABORT backtrace_symbols");
+ exit(EXIT_FAILURE);
+ }
+
+ char shcmd[] = "sed -e 's/.*\\[\\(.*\\)\\]$/\\1/' | addr2line -fspe ";
+ char *cmdbuf = malloc(sizeof(shcmd) + 64);
+ strcpy(cmdbuf, shcmd);
+ strcat(cmdbuf, prg_name);
+ #define SIZE 640
+ char *resbuf = malloc(SIZE);;
+ for(j = 0; j < nptrs; j++) {
+ int pc[2], po[2];
+ if(pipe(pc) < 0) ERR("ABORT pipe");
+ if(pipe(po) < 0) ERR("ABORT pipe");
+ if(!fork()) {
+ close(po[0]); dup2(po[1],1);
+ close(pc[1]); dup2(pc[0],0);
+ execl("/bin/bash", "/bin/bash", "-c", cmdbuf, (char *) NULL);
+ }
+ if(write(pc[1], strings[j], strlen(strings[j])) < 0) ERR("ABORT write pipe");
+ if(write(pc[1], "\n", 1) < 0) ERR("ABORT write pipe");
+ close(pc[1]);
+ memset(resbuf, 0, SIZE);
+ if(read(po[0], resbuf, SIZE)) fprintf(stderr, "%s", resbuf);
+ for(int i = 0; i < 2; i++) close(po[i]), close(pc[i]);
+ }
+ if(sem_post(&(csP->shP->debugSem))< 0) ERR("LOG sem_post");
+ free(strings); free(cmdbuf); free(resbuf);
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/Makefile Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,22 @@
+S := CS.c Debug.c Node.c Data.c
+O := $(S:.c=.o)
+D := $(S:.c=.d)
+CFLAGS = -Wall -D_GNU_SOURCE -lpthread -lm -lrt -lssl -lcrypto
+
+.PHONY: all clean
+all: CS
+clean:
+ rm -f CS $(O)
+
+CS: $(O)
+ gcc $(CFLAGS) $(O) -o $@
+
+%.o: %.c %.d
+ gcc -c $(CFLAGS) -o $@ $<
+
+include $(D)
+%.d: %.c Makefile
+ set -e; rm -f $@; \
+ $(CC) -MM $(CPPFLAGS) $< > $@.$$$$; \
+ sed 's,\($*\)\.o[ :]*,\1.o $@ : ,g' < $@.$$$$ > $@; \
+ rm -f $@.$$$$
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSc/Node.c Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,336 @@
+#include "CS.h"
+
+NodeP thisP;
+DebugP deP;
+DataP dataP;
+
+void sighandle(int sig) { return; }
+
+void bindN() {
+ LOG(4, "binding...");
+ struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+ memset(sa, 0, sizeof(struct addrinfo));
+ sa->ai_family = AF_INET;
+ sa->ai_socktype = SOCK_STREAM;
+ sa->ai_protocol = 0;
+ sa->ai_flags = AI_PASSIVE;
+ char s[64];
+ sprintf(s, "%d", thisP->locPort);
+ int e;
+ if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
+ GAI(4, sa);
+ if((thisP->ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
+ int opt = 1;
+ if(setsockopt(thisP->ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
+ if(bind(thisP->ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
+ if(listen(thisP->ssc, 1) < 0) SYSERR("listen");
+ LOG(2, "bound to %d", thisP->locPort);
+}
+void conn(int i, int remPort) {
+ DebugT debug, *deP = &debug;
+ DEBID("%s to %u", thisP->debug.id, remPort);
+ thisP->cliSides[i].remPort = remPort;
+ int e;
+ LOG(4, "connecting to %u...", remPort);
+ int retry = csP->connThreshold;
+ struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+ memset(ai, 0, sizeof(struct addrinfo));
+ ai->ai_family = AF_INET;
+ ai->ai_socktype = SOCK_STREAM;
+ ai->ai_protocol = 0;
+ ai->ai_flags = 0;
+ char port[6];
+ sprintf(port, "%d", remPort);
+ if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
+ GAI(3, ai);
+ if((thisP->cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
+ while(retry--) {
+ if(connect(thisP->cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
+ if(errno != ECONNREFUSED) SYSERR("connect");
+ usleep(csP->connTO);
+ }
+ else break;
+ }
+ if(retry < 1) {
+ LOG(0, "connection refused threshold %d reached", csP->connThreshold);
+ exit(EXIT_FAILURE);
+ }
+ if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+ csP->shP->conns++;
+ if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+ socklen_t l = sizeof(struct sockaddr);
+ struct sockaddr *sa = malloc(l);
+ if(getpeername(thisP->cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+ LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->cliSides[i].sc); free(sa);
+
+ if(thisP->ssl) {
+ ERR_clear_error();
+ if(!(thisP->cliSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
+ if(!SSL_set_fd(thisP->cliSides[i].sslP, thisP->cliSides[i].sc)) SSLERR("client SSL set fd");
+ if((e = SSL_connect(thisP->cliSides[i].sslP)) < 1) {
+ switch(SSL_get_error(thisP->cliSides[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL: SYSERR("SSL connect"); break;
+ default: SSLERR("SSL connect"); break;
+ }
+ }
+ }
+ LOG(2, "connected via sc=%d after %d retries", thisP->cliSides[i].sc, csP->connThreshold - (retry + 1));
+}
+void acc(int i) {
+ LOG(4, "accepting...");
+ if((thisP->srvSides[i].sc = accept(thisP->ssc, NULL, NULL)) < 0) SYSERR("accept");
+ socklen_t l = sizeof(struct sockaddr);
+ struct sockaddr *sa = malloc(l);
+ if(getpeername(thisP->srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+ LOG(4, "peer: %s on sc=%d", gpa(sa), thisP->srvSides[i].sc); free(sa);
+ if(thisP->ssl) {
+ int e;
+ ERR_clear_error();
+ if(!(thisP->srvSides[i].sslP = SSL_new(thisP->ctx))) SSLERR("new SSL");
+ if(!SSL_set_fd(thisP->srvSides[i].sslP, thisP->srvSides[i].sc)) SSLERR("server SSL set fd");
+ if((e = SSL_accept(thisP->srvSides[i].sslP)) < 1) {
+ switch(SSL_get_error(thisP->srvSides[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
+ default: SSLERR("SSL accept"); break;
+ }
+ }
+ }
+ LOG(2, "accepted");
+}
+void closeN(int i, nodeside side) {
+ SocketP sc;
+ if(side) sc = thisP->srvSides; else sc = thisP->cliSides;
+ LOG(5, "closing sc=%d...", sc[i].sc);
+ if(thisP->ssl) {
+ int e;
+ if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
+ if(!e) {
+ LOG(5, "SSL shutdown rc=0");
+ if((e = SSL_shutdown(sc[i].sslP)) < 0) {
+ switch(SSL_get_error(sc[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL:
+ if(!(e = ERR_get_error())) {
+ if(errno) SYSERR("SSL shutdown (2)");
+ break;
+ }
+ break;
+ default: SSLERR("SSL shutdown (2)"); break;
+ }
+ }
+ }
+ }
+ close(sc[i].sc);
+ LOG(4, "closed sc=%d", sc[i].sc);
+ sc[i].sc = -1;
+}
+void *close_clients() {
+ DebugT debug, *deP = &debug;
+ DEBID("%s CLOSE clients", thisP->debug.id);
+ LOG(5, "start...");
+ for (int i = 0; i < thisP->nodes; i++) if(thisP->cliSides[i].sc > -1) closeN(i, client);
+ LOG(4, "all clients closed");
+ pthread_exit(NULL);
+}
+void close_node() {
+ if(!thisP->closing) {
+ thisP->closingThread = 0;
+ if(pthread_create(&thisP->closingThread, NULL, &close_clients, NULL) != 0) SYSERR("create thread");
+ thisP->closing = 1;
+ }
+}
+int readN(int i) {
+ LOG(5, "ready to read len=%d from sc=%d...", dataP->dataLen, thisP->srvSides[i].sc);
+ int n, rest = dataP->dataLen;
+ void *buf = dataP->contP;
+ while(rest > 0) {
+ if(thisP->ssl) { if((n = SSL_read(thisP->srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
+ else { if((n = read(thisP->srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
+ if(n == 0) {
+ LOG(4, "read EOF");
+ return 0;
+ }
+ else {
+ buf += n; rest -= n;
+ if(rest > 0) LOG(5, "partly read %d bytes", n);
+ }
+ }
+ if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+ csP->shP->msgs++;
+ if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+ LOG(5, "read %d from %u", dataP->dataLen, dataP->contP->hdr.listPort);
+ return dataP->dataLen;
+}
+int writeN(int i) {
+ DebugT debug, *deP = &debug;
+ DEBID("%s to %u", thisP->debug.id, thisP->cliSides[i].remPort);
+ LOG(5, "ready to write len=%d to sc=%d...", dataP->dataLen, thisP->cliSides[i].sc);
+ int n, rest = dataP->dataLen;
+ void *buf = dataP->contP;
+ while(rest > 0) {
+ if(thisP->ssl) { if((n = SSL_write(thisP->cliSides[i].sslP, buf, rest)) < 0) SSLERR("socket write"); }
+ else { if((n = write(thisP->cliSides[i].sc, buf, rest)) < 0) SYSERR("socket write"); }
+ buf += n; rest -= n;
+ if(rest > 0) LOG(5, "partly written %d bytes", n);
+ }
+ LOG(5, "written %d", dataP->dataLen);
+ return dataP->dataLen;
+}
+int getN(int i) {
+ return readN(i) > 0;
+}
+int putN(int i) {
+ dataP->contP->hdr.listPort = thisP->locPort;
+ return writeN(i) > 0;
+}
+int next_node() {
+ int next;
+ if(thisP->topo == ring) {
+ next = thisP->locPort + 1;
+ if(next > thisP->last) next = thisP->first;
+ }
+ else while((next = thisP->first + thisP->nodes * ((float)random() / RAND_MAX)) == thisP->locPort);
+ return next;
+}
+void forward(int sci) {
+ int next, scn;
+ char digest[24];
+ if(getN(sci)) {
+ LOG(5, "received data from %u", remPortData(dataP));
+ if(thisP->kicker) {
+ LOG(4, "received from node %u: %s, ttl=%d",
+ remPortData(dataP), digest24Data(dataP, digest), ttlData(dataP));
+// if(ttlData(dataP) == 2) sabotageData(dataP);
+// if(ttlData(dataP) == 2) errno=0, SYSERR("signal test");
+ if(dttlData(dataP) <= 0) {
+ LOG(1, "received after passing all %s: %s", thisP->topo==mash ? "mashes" : "rings",digest24Data(dataP, digest));
+ close_node();
+ *(thisP->forw) = 0;
+ LOG(4, "leaving forward closing");
+ return;
+ }
+ }
+ next = next_node(); scn = next - thisP->first;
+ LOG(5, "forwarding to %d, len=%d, ttl=%d --->", next, dataP->dataLen, ttlData(dataP));
+ if(thisP->cliSides[scn].sc < 0) conn(scn, next);
+ if(*(thisP->forw) && csP->pacing) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
+ putN(scn);
+ LOG(5, "forwarded to %u", next);
+ }
+ else {
+ close_node();
+ closeN(sci, server);
+ }
+ return;
+}
+void main_loop() {
+ sigset_t pacing;
+ sigemptyset(&pacing);
+ sigaddset(&pacing, PACING);
+ union { // simple select mask debug
+ fd_set rs;
+ uint mask;
+ } u;
+ int nfds;
+ FD_ZERO(&(u.rs)); nfds = 0;
+ if(*(thisP->forw)) {
+ FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
+ while(nfds) {
+ struct timeval t = {csP->selTO, 0};
+ LOG(5, "selecting, mask=%08x", u.mask);
+ int rc;
+ rc = select(nfds, &(u.rs), NULL, NULL, &t);
+ if(rc < 0 && errno != EINTR) SYSERR("select");
+ if(rc > 0) {
+ LOG(5, "return from select, mask=%08x", u.mask);
+ if(FD_ISSET(thisP->ssc, &(u.rs))) { // ssc posted: accept & forward
+ int i;
+ for(i=0; thisP->srvSides[i].sc > -1 && i < thisP->nodes; i++); // find unused slot for accept
+ if(i == thisP->nodes) HARDERR("can't accept, all slots in use");
+ LOG(5, "slot for accept=%d", i);
+ acc(i);
+ forward(i);
+ }
+ else // check which connected socket is posted
+ for(int i = 0; i < thisP->nodes; i++)
+ if(thisP->srvSides[i].sc > -1 && FD_ISSET(thisP->srvSides[i].sc, &(u.rs))) forward(i);
+ }
+ FD_ZERO(&(u.rs)); nfds = 0;
+ if(*(thisP->forw)) { FD_SET(thisP->ssc, &(u.rs)); if(thisP->ssc >= nfds) nfds = thisP->ssc + 1; }
+ for(int i = 0; i < thisP->nodes; i++) { // mask all connected client side sockets for select
+ int sc = thisP->srvSides[i].sc;
+ if(sc > -1) { FD_SET(sc, &(u.rs)); if(sc >= nfds) nfds = sc + 1; }
+ }
+ }
+}
+void Node(topology topo, int *forw, int port, int first, int n, int ssl) {
+ NodeT this;
+ thisP = &this;
+ deP = &(thisP->debug);
+ DEBID("%sSSL %s node %d", ssl ? "" : "non", topo==mash ? "MASH" : "RING", port);
+ LOG(4, "initializing...");
+ dataP = &thisP->data;
+ Data(dataP, deP);
+ thisP->topo = topo;
+ thisP->locPort = port;
+ thisP->first = first;
+ thisP->last = first + n - 1;
+ thisP->nodes = n;
+ thisP->cliSides = malloc(n*sizeof(SocketT));
+ thisP->srvSides = malloc(n*sizeof(SocketT));
+ for(int k=0; k<n; k++) thisP->cliSides[k].sc = thisP->srvSides[k].sc = -1;
+ thisP->kicker = (port == first);
+ thisP->forw = forw;
+ thisP->nodeIdx = port - first;
+ thisP->closing = 0;
+ thisP->ssl = ssl;
+ if(thisP->ssl) {
+ char s[64];
+ SSL_load_error_strings();
+ SSL_library_init();
+ LOG(4, "setting SSL contex...");
+ if(!(thisP->ctx = SSL_CTX_new(TLS_method()))) SSLERR("new SSL CTX");
+ SSL_CTX_set_mode(thisP->ctx, SSL_MODE_AUTO_RETRY);
+ SSL_CTX_set_verify(thisP->ctx, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
+ sprintf(s, "%s/keys/%d.key", csP->ceP, thisP->locPort);
+ if(SSL_CTX_use_PrivateKey_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
+ sprintf(s, "%s/certs/%d.pem", csP->ceP, thisP->locPort);
+ LOG(5, "SSL private key used: %s", s);
+ if(SSL_CTX_use_certificate_file(thisP->ctx, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
+ LOG(5, "SSL certificate used: %s", s);
+ if(SSL_CTX_load_verify_locations(thisP->ctx, NULL, csP->caP) != 1) SSLERR("hh's thrusted certs path");
+ }
+ LOG(5, "initalized");
+
+ bindN(thisP);
+ if(thisP->kicker) {
+ loadData(dataP, csP->ttl, csP->text);
+ int sci, next;
+ next = next_node(); sci = next - thisP->first;
+ char digest[24];
+ LOG(1, "KICKER: ready to initial send %s, len=%d to node %d", digest24Data(dataP, digest), dataP->dataLen, next);
+ conn(sci, next);
+ putN(sci);
+ }
+
+ main_loop();
+
+ LOG(5, "closing ssc");
+ close(thisP->ssc);
+ if(thisP->closing) { // wait for closing thread
+ if(pthread_join(thisP->closingThread, NULL) != 0) SYSERR("join closing thread"); }
+ struct sigaction sigact;
+ sigfillset(&sigact.sa_mask);
+ sigact.sa_handler=sighandle;
+ if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
+ if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
+ int active = --csP->shP->act;
+ if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
+ if(active > 0) pause();
+ else kill(0, SIGUSR2);
+ int exitRc = EXIT_SUCCESS;
+ if(thisP->kicker && !chkData(dataP)) {
+ SOFTERR("INPUT AND OUTPUT DIFFER");
+ exitRc = EXIT_FAILURE; }
+ LOG(2, "ended");
+ exit(exitRc);
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CScpp/CS.cpp Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,519 @@
+#include "CS.h"
+
+CSP csP;
+
+void sighandle(int sig) { return; }
+void abend(DebugP deP) {
+ int pid;
+ //LOG(0, "ABORT, netstat:");
+ //if(!(pid = fork())) {
+ // execl("/bin/bash", "/bin/bash", "-c", "netstat -n --inet -a | grep 1[123][0-9][0-9][0-9] >net_stat", (char *) NULL);
+ // exit(EXIT_SUCCESS);
+ //}
+ waitpid(pid, NULL, 0);
+ LOG(0, "ABORT, backtrace:");
+ deP->back_trace();
+ kill(0, SIGTERM);
+ exit(EXIT_FAILURE);
+}
+char *gpa(struct sockaddr *ai_addr) { // returns string with IP4 address & port assigned to the socket
+ char *s = (char*) malloc(64);
+ unsigned short port = *(unsigned short*) ai_addr->sa_data;
+ char *a = ai_addr->sa_data + 2;
+ sprintf(s, "%hhu.%hhu.%hhu.%hhu %hu", a[0], a[1], a[2], a[3], ntohs(port));
+ return s;
+}
+void gai(int level, struct addrinfo *ai, DebugP deP) { // logs assigned IP4 addresses from addrinfo chain
+ struct addrinfo *sa = ai;
+ if (deP->debug >= level) do {
+ char *s = gpa(sa->ai_addr);
+ strcpy(deP->s, s); free(s);
+ deP->deb(level);
+ } while ((sa = sa->ai_next));
+ fflush(stderr);
+}
+void ssl_err(char *s, DebugP deP) {
+ long e = ERR_get_error();
+ while(e) {
+ LOG(0, "%s: %s", s, ERR_error_string(e, NULL));
+ e = ERR_get_error();
+ }
+}
+static int getArg(const char *a) {
+ return (getenv(a) != NULL) ? atoi(getenv(a)) : 0; }
+HeaderS::HeaderS() {
+ this->ttl = 0; }
+HeaderS::HeaderS(int ttl) {
+ this->ttl = ttl; }
+int HeaderS::len() {
+ return sizeof(HeaderS); }
+PayloadS::PayloadS() {
+ strcpy(&text, "EMPTY"); }
+PayloadS::PayloadS(const char *text) {
+ ts = time(NULL);
+ strcpy(&(this->text), text); }
+int PayloadS::check(PayloadP p) {
+ return (int)(strcmp(&text, &p->text) == 0); }
+char *PayloadS::deliver() {
+ return &text; }
+string PayloadS::digest() {
+ string payl_s = string(&text);
+ if(payl_s.size() < 24) return payl_s;
+ else return payl_s.substr(0, 8) + string("--------") + payl_s.substr(payl_s.size() - 8, 8); }
+void PayloadS::sabotage() {
+ text = '?'; }
+int PayloadS::len() {
+ return sizeof(PayloadS) + strlen(&text); }
+int ContainerS::len() {
+ return hdr.len() + payl.len(); }
+DataC::DataC() {}
+DataC::DataC(DebugP callerDeP) {
+ deP = new DebugC();
+ DEBID("%s DATA", callerDeP->debid);
+ dataLen = sizeof(HeaderS) + sizeof(PayloadS) + csP->text.size();
+ contP = static_cast<ContainerP>(operator new(dataLen));
+ new(&contP->hdr) HeaderS();
+ new(&contP->payl) PayloadS();
+ LOG(5, "Empty Data instance established");
+}
+void DataC::load(int ttl, const char *text) {
+ new(&contP->hdr) HeaderS(ttl);
+ new(&contP->payl) PayloadS(text);
+ LOG(5, "Data instance loaded with payload");
+ return;
+}
+string DataC::unld() {
+ return string(&(contP->payl.text)); }
+string DataC::digest() {
+ return contP->payl.digest(); }
+int DataC::dttl() {
+ return --contP->hdr.ttl; }
+int DataC::ttl() {
+ return contP->hdr.ttl; }
+bool DataC::dataOk() {
+ return csP->text == string(&(contP->payl.text)); }
+int DataC::ts() {
+ return contP->hdr.ts; }
+int DataC::remPort(int remPort) {
+ return (contP->hdr.remPort = remPort); }
+int DataC::remPort() {
+ return contP->hdr.remPort; }
+NodeC::NodeC(ConstellationP co, int port) {
+ deP = new DebugC();
+ DEBID("%sSSL %s node %d", co->ssl ? "" : "non", co->topo==mash ? "MASH" : "RING", port);
+ LOG(4, "intializing ...");
+ data = DataC(deP);
+ topo = co->topo;
+ locPort = port;
+ first = co->first;
+ nodes = co->nodes;
+ last = first + nodes - 1;
+ cliSides = new SocketS[nodes];
+ srvSides = new SocketS[nodes];
+ for(int k=0; k<nodes; k++) cliSides[k].sc = srvSides[k].sc = -1;
+ kicker = (locPort == first);
+ forwP = co->forwP;
+ closing = 0;
+ ssl = co->ssl;
+ ssc = 0;
+ if(ssl) {
+ char s[128];
+ SSL_load_error_strings();
+ SSL_library_init();
+ LOG(4, "setting SSL contex...");
+ if(!(ctxP = SSL_CTX_new(SSLv23_method()))) SSLERR("new SSL CTX");
+ SSL_CTX_set_mode(ctxP, SSL_MODE_AUTO_RETRY);
+ SSL_CTX_set_verify(ctxP, SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL); // SSL will claim partner certificate
+ sprintf(s, "%s/keys/%d.key", csP->cePath.c_str(), locPort);
+ LOG(5, "SSL private key used: %s", s);
+ if(SSL_CTX_use_PrivateKey_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's key file");
+ sprintf(s, "%s/certs/%d.pem", csP->cePath.c_str(), locPort);
+ LOG(5, "SSL certificate used: %s", s);
+ if(SSL_CTX_use_certificate_file(ctxP, s, SSL_FILETYPE_PEM) != 1) SSLERR("hh's cert file");
+ LOG(5, "SSL: CApath: %s", csP->caPath.c_str());
+ if(SSL_CTX_load_verify_locations(ctxP, NULL, csP->caPath.c_str()) != 1) SSLERR("hh's thrusted certs path");
+ }
+ LOG(5, "initalized");
+}
+int NodeC::run() {
+ LOG(5, "binding, kicker=%d", kicker);
+ bindN();
+ if(kicker) {
+ data.load(csP->ttl, csP->text.c_str());
+ int sci, next;
+ next = next_node(); sci = next - first;
+ LOG(2, "ready to initial send %s, len=%d to node %d", data.digest().c_str(), data.dataLen, next);
+ conn(sci, next);
+ putN(sci);
+ }
+ mainLoop();
+ LOG(5, "closing ssc");
+ close(ssc);
+ if(closing) closingThread.join(); // wait for closing thread
+ struct sigaction sigact;
+ sigfillset(&sigact.sa_mask);
+ sigact.sa_handler = sighandle;
+ if(sigaction(SIGUSR2,&sigact,NULL) < 0) SYSERR("sigaction");
+ if(sem_wait(&csP->shP->counterSem) < 0) SYSERR("sem_wait");
+ if(--csP->shP->act == 0) {
+ if(sem_post(&csP->shP->counterSem) < 0) SYSERR("sem_post");
+ kill(0, SIGUSR2);
+ }
+ else {
+ if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+ pause();
+ }
+ int exitRc = EXIT_SUCCESS;
+ if(kicker && !data.dataOk()) {
+ SOFTERR("input and output differ");
+ exitRc = EXIT_FAILURE;
+ }
+ LOG(2, "ended");
+ exit(exitRc);
+}
+void NodeC::mainLoop() {
+ fd_set rs;
+ int nfds;
+ struct timeval t = {1, 0};
+ nfds = 0; FD_ZERO(&rs);
+ if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
+ while(nfds) {
+ t.tv_sec = csP->selTO;
+ int rc;
+ rc = select(nfds, &rs, NULL, NULL, &t);
+ if(rc < 0 && errno != EINTR) SYSERR("select");
+ if(rc > 0) {
+ if(FD_ISSET(ssc, &rs)) {
+ int i;
+ for(i=0; srvSides[i].sc > -1 && i < nodes; i++); // find unused slot for accept
+ if(i == nodes) HARDERR("can't accept, all slots in use");
+ LOG(5, "slot for accept=%d", i);
+ acc(i);
+ forward(i);
+ }
+ else
+ for(int i = 0; i < nodes; i++) {
+ if(srvSides[i].sc > -1 && FD_ISSET(srvSides[i].sc, &rs)) forward(i); }
+ }
+ FD_ZERO(&rs); nfds = 0; t.tv_sec = 1;
+ for(int i = 0; i < nodes; i++) {
+ int sc = srvSides[i].sc;
+ if(sc > -1) { FD_SET(sc, &rs); if(sc >= nfds) nfds = sc + 1; }
+ }
+ if(*(forwP)) { FD_SET(ssc, &rs); if(ssc >= nfds) nfds = ssc + 1; }
+ }
+}
+void NodeC::forward(int sci) {
+ int next, scn;
+ if(getN(sci)) {
+ LOG(5, "received data from %u", data.remPort());
+ if(kicker) {
+ LOG(3, "received from node %u: %s, ttl=%d", data.remPort(), data.digest().c_str(), data.ttl());
+ if(data.dttl() <= 0) {
+ LOG(1, "received after passing all %s: %s", topo==mash ? "mashes" : "rings", data.digest().c_str());
+ closeClients();
+ *(forwP) = 0;
+ LOG(4, "leaving forward closing");
+ return;
+ }
+ }
+ next = next_node(); scn = next - first;
+ LOG(5, "forwarding len=%d to %d --->", data.dataLen, next);
+ if(cliSides[scn].sc < 0) conn(scn, next);
+ if(*(forwP)) { LOG(5, "pacing..."); nanosleep(&(csP->pace), NULL); }
+ putN(scn);
+ LOG(5, "forwarded to %u", next);
+ }
+ else {
+ closeClients();
+ closeSocket(sci, server);
+ }
+ return;
+}
+void NodeC::bindN() {
+ LOG(4, "binding...");
+ struct addrinfo *sa = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+ memset(sa, 0, sizeof(struct addrinfo));
+ sa->ai_family = AF_INET;
+ sa->ai_socktype = SOCK_STREAM;
+ sa->ai_protocol = 0;
+ sa->ai_flags = AI_PASSIVE;
+ char s[64];
+ sprintf(s, "%d", locPort);
+ int e;
+ if((e = getaddrinfo(NULL, s, sa, &sa)) != 0) HARDERR(gai_strerror(e));
+ GAI(4, sa);
+ if((ssc = socket(sa->ai_family, sa->ai_socktype, sa->ai_protocol)) < 0) SYSERR("socket alloc");
+ int opt = 1;
+ if(setsockopt(ssc, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) SYSERR("set socket options");
+ if(bind(ssc, sa->ai_addr, sa->ai_addrlen) < 0) SYSERR("bind");
+ if(listen(ssc, 1) < 0) SYSERR("listen");
+ LOG(2, "bound to %d", locPort);
+}
+void NodeC::conn(int i, int remPort) {
+ DebugP deP = new DebugC();
+ DEBID("%s to %u", this->deP->debid, remPort);
+ cliSides[i].remPort = remPort;
+ int e;
+ LOG(4, "connecting to %u...", remPort);
+ int retry = csP->connThreshold;
+ struct addrinfo *ai = (struct addrinfo*)malloc(sizeof(struct addrinfo));
+ memset(ai, 0, sizeof(struct addrinfo));
+ ai->ai_family = AF_INET;
+ ai->ai_socktype = SOCK_STREAM;
+ ai->ai_protocol = 0;
+ ai->ai_flags = 0;
+ char port[6];
+ sprintf(port, "%d", remPort);
+ if((e = getaddrinfo("localhost", port, ai, &ai)) != 0) HARDERR(gai_strerror(e));
+ GAI(3, ai);
+ if((cliSides[i].sc = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) SYSERR("socket alloc");
+ while(retry--) {
+ if(connect(cliSides[i].sc, ai->ai_addr, ai->ai_addrlen) < 0) {
+ if(errno != ECONNREFUSED) SYSERR("connect");
+ usleep(csP->connTO); }
+ else break; }
+ if(retry < 1) {
+ LOG(0, "connection refused threshold %d reached", csP->connThreshold);
+ exit(EXIT_FAILURE); }
+ if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+ csP->shP->conns++;
+ if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+ socklen_t l = sizeof(struct sockaddr);
+ struct sockaddr *sa = (sockaddr*)malloc(l);
+ if(getpeername(cliSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+ LOG(4, "peer: %s on sc=%d", gpa(sa), cliSides[i].sc); free(sa);
+ if(ssl) {
+ ERR_clear_error();
+ if(!(cliSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
+ if(!SSL_set_fd(cliSides[i].sslP, cliSides[i].sc)) SSLERR("client SSL set fd");
+ if((e = SSL_connect(cliSides[i].sslP)) < 1) {
+ switch(SSL_get_error(cliSides[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL:
+ ssl_err((char*)"SSL connect", deP);
+ if(e == 0) LOG(0, "SSL connect: EOF on socket");
+ else LOG(0, "SSL connect: %s (%d)", strerror(errno), errno);
+ abend(deP);
+ break;
+ default:
+ SSLERR("SSL connect");
+ break;
+ }
+ }
+ }
+ LOG(2, "connected via sc=%d after %d retries", cliSides[i].sc, csP->connThreshold - (retry + 1));
+}
+void NodeC::acc(int i) {
+ LOG(4, "accepting...");
+ if((srvSides[i].sc = accept(ssc, NULL, NULL)) < 0) SYSERR("accept");
+ socklen_t l = sizeof(struct sockaddr);
+ struct sockaddr *sa = (sockaddr*)malloc(l);
+ if(getpeername(srvSides[i].sc, sa, &l) < 0) SYSERR("getpeername");
+ LOG(4, "peer: %s on sc=%d", gpa(sa), srvSides[i].sc); free(sa);
+ if(ssl) {
+ int e;
+ ERR_clear_error();
+ if(!(srvSides[i].sslP = SSL_new(ctxP))) SSLERR("new SSL");
+ if(!SSL_set_fd(srvSides[i].sslP, srvSides[i].sc)) SSLERR("server SSL set fd");
+ if((e = SSL_accept(srvSides[i].sslP)) < 1) {
+ switch(SSL_get_error(srvSides[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL: SYSERR("SSL accept"); break;
+ default: SSLERR("SSL accept"); break;
+ }
+ }
+ }
+ LOG(2, "accepted");
+}
+void NodeC::closeSocket(int i, nodeside side) {
+ SocketP sc;
+ if(side) sc = srvSides; else sc = cliSides;
+ LOG(5, "closing sc=%d...", sc[i].sc);
+ if(ssl) {
+ int e;
+ if((e = SSL_shutdown(sc[i].sslP)) < 0) SYSERR("SSL shutdown (1)");
+ if(!e) {
+ LOG(5, "SSL shutdown rc=0");
+ if((e = SSL_shutdown(sc[i].sslP)) < 0) {
+ switch(SSL_get_error(sc[i].sslP, e)) {
+ case SSL_ERROR_SYSCALL: {
+ long e;
+ if(!(e = ERR_get_error()) && errno) SYSERR("SSL shutdown (2)");
+ break;
+ }
+ default:
+ SSLERR("SSL shutdown (2)");
+ break;
+ }
+ }
+ }
+ }
+ close(sc[i].sc);
+ LOG(4, "closed sc=%d", sc[i].sc);
+ sc[i].sc = -1;
+}
+void closeCliTh(void *p) {
+ NodeP nP = (NodeP)p;
+ char *callerid = nP->deP->debid;
+ DebugP deP = new DebugC();
+ DEBID("%s CLOSE clients thread", callerid);
+ LOG(5, "start...");
+ for (int i = 0; i < nP->nodes; i++) if(nP->cliSides[i].sc > -1) nP->closeSocket(i, client);
+ LOG(4, "all clients closed");
+}
+void NodeC::closeClients() {
+ if(!closing) try { closingThread = thread(closeCliTh, this); } catch(exception e) {
+ cout << "closing tread: " << e.what() << '\n'; }
+ closing = 1;
+}
+int NodeC::readN(int i) {
+ LOG(5, "to read len=%d from sc=%d...", data.dataLen, srvSides[i].sc);
+ int n, rest = data.dataLen;
+ char *buf = (char*)data.contP;
+ while(rest > 0) {
+ if(ssl) { if((n = SSL_read(srvSides[i].sslP, buf, rest)) < 0) SSLERR("read socket"); }
+ else { if((n = read(srvSides[i].sc, buf, rest)) < 0) SYSERR("read socket"); }
+ if(n == 0) {
+ LOG(4, "read EOF");
+ return 0;
+ }
+ else {
+ buf += n; rest -= n;
+ if(rest > 0) LOG(5, "partly read %d bytes, buf=%p", n, buf);
+ }
+ }
+ if(sem_wait(&(csP->shP->counterSem)) < 0) SYSERR("sem_wait");
+ csP->shP->msgs++;
+ if(sem_post(&(csP->shP->counterSem)) < 0) SYSERR("sem_post");
+ LOG(5, "read %d from %u", data.dataLen, data.remPort());
+ return (data.dataLen);
+}
+int NodeC::writeN(int i) {
+ LOG(5, "to write len=%d to sc=%d...", data.dataLen, cliSides[i].sc);
+ int n, rest = data.dataLen;
+ char *buf = (char*)data.contP;
+ while(rest > 0) {
+ if(ssl) { if((n = SSL_write(cliSides[i].sslP, data.contP, data.dataLen)) < 0) SSLERR("socket write"); }
+ else { if((n = write(cliSides[i].sc, data.contP, data.dataLen)) < 0) SYSERR("socket write"); }
+ buf += n; rest -= n;
+ if(rest > 0) LOG(5, "partly written %d bytes", n);
+ }
+ LOG(5, "written %d", data.dataLen);
+ return data.dataLen;
+}
+int NodeC::getN(int i) { return readN(i) > 0; }
+int NodeC::putN(int i) {
+ data.remPort(locPort);
+ return writeN(i) > 0; }
+int NodeC::next_node() {
+ int next;
+ if(topo == ring) {
+ next = locPort + 1;
+ if(next > last) next = first;
+ }
+ else while((next = first + nodes * ((float)random() / RAND_MAX)) == locPort);
+ return next;
+}
+ConstellationC::ConstellationC() {
+ deP = NULL;
+ forwP = NULL;
+ topo = ring;
+ ssl = first = nodes = 0;
+};
+ConstellationC::ConstellationC(topology topo, int ssl) {
+ deP = new DebugC();
+ DEBID("%sSSL %s", ssl ? "" : "non", topo==mash ? "MASH" : "RING");
+ this->topo = topo;
+ if(topo == ring) { first=csP->rp0; nodes=csP->rn; }
+ if(topo == mash) { first=csP->mp0; nodes=csP->mn; }
+ this->ssl = ssl;
+ first += ssl*500;
+ forwP = NULL;
+}
+int ConstellationC::run() {
+ int stat = 0, pid = 0, exitRc = EXIT_SUCCESS;
+ if(nodes == 0) exit(0);
+ if(nodes == 1) { LOG(0, "1 node configuration not implemented"); exit(0); }
+ pid_t *pids = new pid_t[nodes];
+ LOG(1, "%d nodes starting...", nodes);
+ if((forwP = (int*)mmap(NULL, sizeof(int), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap");
+ *forwP = 1;
+ for(int port = first; port < first + nodes; port++) {
+ if(!(pid = fork())) (new NodeC(this, port))->run();
+ else { pids[port-first] = pid; LOG(4, "node %u established in process %u", port, pid); }
+ }
+ LOG(2, "all nodes established");
+ while((pid = wait(&stat)) > 0)
+ if(WIFEXITED(stat)) {
+ LOG(4, "node process %u ended with exit(%d)", pid, WEXITSTATUS(stat));
+ if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
+ }
+ else exitRc = EXIT_FAILURE;
+ LOG(1, "ENDED %s", exitRc == EXIT_SUCCESS ? "OK" : "with ERROR");
+ exit(exitRc);
+}
+CSS::CSS(DebugP deP, char *prgnP) {
+ ShareA(shP);
+ shP->msgs = 0;
+ shP->conns = 0;
+ if(sem_init(&shP->counterSem, 1, 1) < 0) SYSERR("sem_init");
+ text = "bla bla";
+ ttl = 3;
+ rp0 = 11000;
+ mp0 = 12000;
+ rn = 0;
+ mn = 0;
+ issl = 0;
+ const string sslPathSuffP = "../CS";
+ caPath = "/home/local/etc/ssl/certs/";
+ connThreshold = 77; // connection retries threshold
+ connTO = 0.01 * 1000*1000; // connection sleep time in usecs
+ selTO = 1; // selection timeout in secs
+
+ if(getArg("DEB") >= 0) DebugC::debug = getArg("DEB");
+ if(getenv("T") != NULL) { text = getenv("T"); }
+ if(getenv("CEP") != NULL) cePath = getenv("CEP");
+ else { cePath = dirname(prgnP); cePath += "/"; cePath += sslPathSuffP; }
+ if(getenv("CAP") != NULL) caPath = getenv("CAP");
+ if(getArg("TTL") > 0) ttl = getArg("TTL");
+ if(getArg("RP0") > 0) rp0 = getArg("RP0");
+ if(getArg("MP0") > 0) mp0 = getArg("MP0");
+ if(getArg("N") >= 0) { mn = getArg("N"); rn = mn; }
+ if(getArg("SSL") >= 0) issl = getArg("SSL");
+ if(getArg("RN") >= 0) rn = getArg("RN");
+ if(getArg("MN") >= 0) mn = getArg("MN");
+ shP->act = rn + mn; // initialize active node processes counter
+ if(issl > 1) shP->act += shP->act;
+ pacing = 0;
+ if(getenv("P") != NULL) {
+ double d = atof(getenv("P"));
+ pace.tv_sec=(time_t)trunc(d);
+ pace.tv_nsec=(d-pace.tv_sec)*1000*1000*1000;
+ if(pace.tv_sec > 0 || pace.tv_nsec > 0) pacing = 1;
+ }
+ if(getArg("RS") >= 0) srandom(getArg("RS"));
+}
+int main(int argc, char *argv[]) {
+ DebugC::debug_init(argv[0]);
+ DebugP deP = new DebugC();
+ DEBID("client/server demo");
+ csP = new CSS(deP, argv[0]);
+ LOG(1, "pgm=%s, ttl=%u, pace=%lu.%03lu, seed=%u, SSL=%u, debug=%d",\
+ argv[0], csP->ttl, csP->pace.tv_sec, csP->pace.tv_nsec/(1000*1000), getArg("RS"), csP->issl, DebugC::debug);
+ if(csP->issl > 0) LOG(3, "certs path=%s, CA certs path=%s", csP->cePath.c_str(), csP->caPath.c_str());
+ if(csP->issl < 2) {
+ if(!fork()) (new ConstellationC(ring, csP->issl))->run();
+ if(!fork()) (new ConstellationC(mash, csP->issl))->run();
+ } else for(int ssl = 0; ssl < csP->issl; ssl++) {
+ if(!fork()) (new ConstellationC(ring, ssl))->run();
+ if(!fork()) (new ConstellationC(mash, ssl))->run();
+ }
+ int stat, exitRc = EXIT_SUCCESS;
+ while(wait(&stat) > 0)
+ if(WIFEXITED(stat)) {
+ LOG(5, "constellation ended with exit(%d)", WEXITSTATUS(stat));
+ if(WEXITSTATUS(stat)) exitRc = EXIT_FAILURE;
+ }
+ else exitRc = EXIT_FAILURE;
+ LOG(1, "%s end, forwards=%d, connections=%d", exitRc == EXIT_SUCCESS ? "NORMAL" : "BAD", csP->shP->msgs, csP->shP->conns);
+ exit(exitRc);
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CScpp/CS.h Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,171 @@
+#ifndef _CSH_
+#define _CSH_ 1
+
+using namespace std;
+#include <string>
+#include <iostream>
+#include <thread>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/time.h>
+#include <openssl/ssl.h>
+#include <openssl/bio.h>
+#include <openssl/err.h>
+#include <sys/wait.h>
+#include <sys/select.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <libgen.h>
+#include <math.h>
+#include "Debug.h"
+
+typedef struct ShareS { // shared items between procs or threads
+ int conns; // overall connections# both in ring and mash
+ int msgs; // overall forwards# both in ring and mash
+ int act; // active nodes# both in ring and mash
+ sem_t counterSem; // semaphore for counters
+} ShareT, *ShareP;
+#define ShareA(v) if((v = (ShareP) mmap(NULL, sizeof(ShareT), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) SYSERR("mmap")
+
+typedef struct CSS { // top level attributes
+ string text; // text to be sent in messages
+ int ttl; // TTL for circulating msgs
+ int mp0; // TCP port of first mash node
+ int mn; // intended # of nodes in mash
+ int rp0; // TCP port of first ring node
+ int rn; // intended # of nodes in ring
+ struct timespec pace; // pacing time quantum
+ int pacing; // pacing indicator
+ int issl; // ssl switch: 0=no_ssl, 1=ssl, 2=both
+ int connThreshold; // connection retry threshhold
+ int connTO; // connection timeout in usecs
+ int selTO; // selection timeout in secs
+ string cePath; // SSL certs path
+ string caPath; // CA certs path
+ ShareP shP;
+ CSS(DebugP, char*);
+} *CSP;
+
+typedef struct HeaderS { // container header
+ int ttl;
+ int ts;
+ int remPort;
+
+ HeaderS();
+ HeaderS(int);
+ int len();
+} *HeaderP;
+
+typedef struct PayloadS { // payload structure
+ time_t ts;
+ char text;
+
+ PayloadS();
+ PayloadS(const char *);
+ int check(PayloadS*);
+ char *deliver();
+ string digest();
+ void sabotage();
+ int len();
+} *PayloadP;
+
+typedef struct ContainerS { // data container sent throug mash or ring
+ HeaderS hdr;
+ PayloadS payl;
+ int len();
+} ContainerT, *ContainerP;
+
+typedef class DataC { // data sent and received
+public:
+ DebugP deP;
+ ContainerP contP;
+ int dataLen;
+// int transferLen;
+
+ DataC();
+ DataC(DebugP);
+ int dttl();
+ int ttl();
+ void load(int, const char*);
+ string unld();
+ string digest();
+ bool dataOk();
+ int ts();
+ int remPort();
+ int remPort(int);
+} *DataP;
+
+typedef struct SocketS { // info about sockets allocated in node
+ int remPort;
+ int sc;
+ SSL *sslP;
+} *SocketP;
+
+typedef enum{ring, mash} topology;
+typedef enum{client, server} nodeside;
+class ConstellationC { // constellation of communication nodes (mash or ring)
+ DebugP deP;
+public:
+ topology topo;
+ int ssl, first, nodes, *forwP;
+
+ ConstellationC();
+ ConstellationC(topology, int);
+ int run();
+};
+typedef ConstellationC *ConstellationP;
+
+class NodeC : public ConstellationC { // attributes and operations of one node of constellation (mash or ring)
+public:
+ DebugP deP;
+ int locPort;
+ int last;
+ int kicker;
+ int closing;
+ thread closingThread; // thread to close client side sockets
+ DataC data;
+ int len;
+ int ssc;
+ SocketP cliSides;
+ SocketP srvSides;
+ SSL_CTX *ctxP;
+
+ NodeC(ConstellationP, int);
+ int run();
+ void mainLoop();
+ void bindN();
+ void conn(int, int);
+ void acc(int);
+ void closeSocket(int, nodeside);
+ int getN(int);
+ int readN(int);
+ int putN(int);
+ int writeN(int);
+ int next_node();
+ void forward(int);
+ void closeClients();
+};
+typedef NodeC *NodeP;
+
+extern char *gpa(struct sockaddr *);
+extern void gai(int, struct addrinfo *ai, DebugP);
+#define GAI(level, ai) gai(level, ai, deP)
+extern void ssl_err(char*);
+extern void abend();
+
+#define SYSERR(e) LOG(0, "%s: %s (%d)", (char*)e, strerror(errno), errno), abend(deP)
+#define SSLERR(e) ssl_err((char*)e, deP), abend(deP)
+#define SOFTERR(e) LOG(0, e), fflush(stderr)
+#define HARDERR(e) SOFTERR(e), abend(deP);
+
+extern CSP csP;
+
+#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CScpp/Debug.cpp Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,80 @@
+#include <sys/time.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <semaphore.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <execinfo.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include "Debug.h"
+#define ERR(e) perror(e), kill(0, SIGTERM), exit(EXIT_FAILURE)
+
+// independent utility staff
+
+int DebugC::debug = 0;
+char *DebugC::prg_name;
+long int DebugC::t0;
+sem_t *DebugC::semP;
+
+void DebugC::deb(int level) {
+ if (debug >= level) {
+ if(sem_wait(semP)< 0) ERR("LOG sem_wait");
+ struct timeval t;
+ gettimeofday(&t, NULL);
+ fprintf(stderr, "%09ld %s: %s\n", 1000000*t.tv_sec+t.tv_usec-t0, debid, s);
+ fflush(stderr);
+ if(sem_post(semP) < 0) ERR("LOG sem_post");
+ }
+}
+void DebugC::back_trace() {
+ if(sem_wait(semP)< 0) ERR("LOG sem_wait");
+
+ int j, nptrs;
+ void *buffer[24];
+ char **strings;
+
+ nptrs = backtrace(buffer, 24);
+
+ strings = backtrace_symbols(buffer, nptrs);
+ if (strings == NULL) {
+ ERR("ABORT backtrace_symbols");
+ exit(EXIT_FAILURE);
+ }
+
+ char shcmd[] = "sed -e 's/.*\\[\\(.*\\)\\]$/\\1/' | addr2line -fspe ";
+ char *cmdbuf = (char*)malloc(sizeof(shcmd) + 64);
+ strcpy(cmdbuf, shcmd);
+ strcat(cmdbuf, prg_name);
+ #define SIZE 640
+ char *resbuf = (char*)malloc(SIZE);;
+ for(j = 0; j < nptrs; j++) {
+ int pc[2], po[2];
+ if(pipe(pc) < 0) ERR("ABORT pipe");
+ if(pipe(po) < 0) ERR("ABORT pipe");
+ if(!fork()) {
+ close(po[0]); dup2(po[1],1);
+ close(pc[1]); dup2(pc[0],0);
+ execl("/bin/bash", "/bin/bash", "-c", cmdbuf, (char *) NULL);
+ }
+ if(write(pc[1], strings[j], strlen(strings[j])) < 0) ERR("ABORT write pipe");
+ if(write(pc[1], "\n", 1) < 0) ERR("ABORT write pipe");
+ close(pc[1]);
+ memset(resbuf, 0, SIZE);
+ if(read(po[0], resbuf, SIZE)) fprintf(stderr, "%s", resbuf);
+ for(int i = 0; i < 2; i++) close(po[i]), close(pc[i]);
+ }
+ if(sem_post(semP)< 0) ERR("LOG sem_post");
+ free(strings); free(cmdbuf); free(resbuf);
+}
+void DebugC::debug_init(char *prgname) {
+ struct timeval t;
+ DebugC::prg_name = prgname;
+ gettimeofday(&t, NULL);
+ DebugC::t0 = 1000000*t.tv_sec+t.tv_usec;
+ if((DebugC::semP = (sem_t*)mmap(NULL, sizeof(sem_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0)) < 0) ERR("mmap");
+ if(sem_init(DebugC::semP, 1, 1) < 0) ERR("LOG sem_init");
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CScpp/Debug.h Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,25 @@
+#ifndef _DebugH_
+#define _DebugH_ 1
+
+class DebugC { // independent utility staff
+ public:
+ static int debug; // debug log level
+ static char *prg_name; // not used at the moment
+ static long int t0;
+ static sem_t semaphore; // semafor for log timestamping
+ static sem_t *semP; // -->semafor for log timestamping
+ static void debug_init(char *prgname);
+
+ char s[256]; // debug msg workspace
+ char debid[128]; // debug ID of process
+
+ void pre(char *o);
+ void deb(int level);
+ void err(int level, char *o);
+ void back_trace();
+};
+typedef DebugC *DebugP;
+
+#define DEBID(...) sprintf(deP->debid, __VA_ARGS__)
+#define LOG(level, ...) sprintf(deP->s, __VA_ARGS__), deP->deb(level)
+#endif
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CScpp/Makefile Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,20 @@
+#O := CS.o Debug.o RingNode.o MashNode.o N.o Node.o Partner.o Payload.o Data.o
+O := CS.o Debug.o
+H := CS.h
+
+CPPFLAGS += -Wall -D_GNU_SOURCE -lc -lpthread -lrt -lssl -lcrypto
+
+.PHONY: all clean
+all: CS
+
+CS: $(O)
+ g++ $(CPPFLAGS) $(O) -o $@
+
+%: %.cpp
+ g++ $(CPPFLAGS) -o $@ $<
+
+%.o: %.cpp $(H)
+ g++ -c $(CPPFLAGS) -o $@ $<
+
+clean:
+ rm -f CS $(O)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSj/CS.java Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1135 @@
+import java.util.*;
+import java.io.*;
+import java.net.*;
+import javax.net.ssl.*;
+import java.security.*;
+import java.nio.*;
+import java.nio.channels.*;
+import java.awt.*;
+import java.awt.image.*;
+import java.awt.event.*;
+import javax.swing.*;
+import javax.swing.border.*;
+
+class Debug {
+ static int debug;
+ String debid = "";
+ Debug(Debug d) { debid = d.debid; }
+ Debug(String debid) { this.debid = debid; }
+ Debug() {}
+ static void pRe(Object o) {
+ System.err.println((new Date().getTime()) + " " + o);
+ System.err.flush();
+ }
+ void log(int level, Object o) {
+ if(debug == 7) { if(level == 7) pRe(debid + ": " + o); }
+ else if(debug >= level) pRe(debid + ": " + o);
+ }
+ boolean abendMsg(String msg, Exception x) {
+ log(0, "ABEND: " + msg + (x == null ? "" : (": " + x.getClass().getSimpleName() + ": " + x.getMessage())));
+ return false;
+ }
+ String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit(); }
+}
+class Data extends Debug implements Serializable {
+ static final long serialVersionUID = 42;
+ class DataObj implements Serializable {
+ static final long serialVersionUID = 42;
+ String text;
+ int ttl;
+ int lport;
+ int rport;
+ DataObj(String s, int ttl) {
+ this.text = s;
+ this.ttl = ttl;
+ this.lport = 0;
+ this.rport = 0;
+ }
+ }
+ ByteBuffer buf;
+ DataObj data;
+ Data(Debug deb, String s, int ttl) throws Exception {
+ super(deb.debid + " DATA");
+ data = new DataObj(s, ttl);
+ if(CS.fake != 0 && debid.equals("DEMO SSL MASH DATA")) throw new Exception("faked error"); // <-----------------------------------------------
+ load();
+ }
+ Data(Debug deb, ByteBuffer b) throws Exception {
+ super(deb);
+ debid += " DATA";
+ log(5, "unloading data objects from buffer...");
+ unload();
+ }
+ Data(Debug deb) {
+ super(deb);
+ debid += " DATA";
+ }
+ void load() throws IOException {
+ log(5, "loading data objects to buffer...");
+ ByteArrayOutputStream bas = new ByteArrayOutputStream();
+ try {
+ new ObjectOutputStream(bas).writeObject(data);
+ buf = ByteBuffer.wrap(bas.toByteArray());
+ } catch(IOException x) { abendMsg("data buffer " + data + " load error", x); throw new IOException(x); }
+ }
+ void unload() throws Exception {
+ buf.rewind();
+ data = (DataObj) new ObjectInputStream(new ByteArrayInputStream(buf.array())).readObject();
+ }
+ int dttl() {
+ return --data.ttl;
+ }
+ int ttl() {
+ return data.ttl;
+ }
+ boolean equals(Data data) {
+ return this.data.text.equals(data.data.text);
+ }
+ public String toString() {
+ return "data: ttl=" + data.ttl + ", text=" +
+ (data.text.length() < 24 ? data.text : data.text.substring(0, 8) + "-------" + data.text.substring(data.text.length() - 8));
+ }
+}
+class Node extends Debug implements Runnable {
+ boolean kicker, closing = false;
+ int thisNode, nextNode, closingThreshHold = 0;
+ Data data;
+ Selector selector;
+ ServerSocketChannel ssc;
+ SelectionKey ssk;
+ SSLContext sslC = null;
+ HashMap<Integer, SockIO> cliSide = new HashMap<Integer, SockIO>();
+ HashMap<SelectionKey, SockIO> srvSide = new HashMap<SelectionKey, SockIO>();
+ Constellation con;
+ Node(Constellation con, int port) {
+ super(con);
+ debid = (con.ssl ? "" : "non") + "SSL " + (con.mash ? "mash" : "ring") + " node " + port;
+ log(4, "initalizing...");
+ try {
+ this.con = con;
+ thisNode = port;
+ kicker = (thisNode == con.first);
+ if(con.ssl) try { prepareSSL(); } catch(Exception x) { abend("SSL preparation", x); throw new Exception(); }
+ try { bind(); } catch(Exception x) { abend("bind", x); throw new Exception(); }
+ data = new Data(this);
+ data.buf = ByteBuffer.allocate(con.data.buf.limit());
+ log(5, "initalized");
+ } catch(Exception x) { abend("initialization", x); }
+ }
+ public void run() {
+ runLoop();
+ log(2, "ended");
+ synchronized(con.glob) { con.glob.spawned--; con.glob.notify(); }
+ }
+ public void runLoop() {
+ if(con.glob.doForward && kicker) { // kick off
+ log(1, "ready to initial send (" + con.data.buf.limit() + " bytes) " + con.data.toString());
+ data.buf.put(con.data.buf); data.buf.flip();
+ try { data.unload(); data.load(); } catch(Exception x) {};
+ doForward();
+ }
+ //if(con.glob.doForward && !con.glob.abend) do { // main loop
+ if(con.glob.doForward) do { // main loop
+ int k = 0;
+ log(5, "main loop, waiting for " + (con.glob.doForward ? "data" : "close") +
+ " from " + srvSide.size() + " open sockets, timeout=" + CS.selTO + " msecs, closingThreshHold=" + closingThreshHold);
+ if(!con.glob.doForward && closingThreshHold == 0) closingThreshHold = 5000 / CS.selTO;
+ try {
+ try { while((k = selector.select(CS.selTO)) == 0 && con.glob.doForward) {}
+ } catch(Exception x) { abend("socket channel select", x); throw new Exception(x); }
+ if(k > 0) {
+ for(Iterator<SelectionKey> ski = selector.selectedKeys().iterator(); ski.hasNext();) {
+ SelectionKey sk = ski.next();
+ if(sk.isAcceptable())
+ try { acc(); } catch(Exception x) { abend("main loop accept error", x); throw new Exception(x); }
+ if(sk.isReadable())
+ try { forward(sk); } catch(Exception x) { abend("main loop forward error", x); throw new Exception(x); }
+ ski.remove();
+ }
+ }
+ } catch (Exception x) { abend("main loop", x); }
+ if(con.glob.stop) {
+ log(1, "constellation stop, closing all connections");
+ stop(); return;
+ }
+ } while(con.glob.doForward || ((srvSide.size() > 0) && (closingThreshHold-- > 0)));
+ //} while((con.glob.doForward || (srvSide.size() > 0)) && !con.glob.abend);
+ log(5, "closing ssc...");
+ try { ssc.close(); } catch (Exception x) { abend("ssc close", x); }
+ if(kicker && !con.glob.stop && !con.glob.abend) con.glob.abend = !data.equals(con.data);
+ }
+ private void forward(SelectionKey sk) {
+ log(5, "entering 'forward'...");
+ SockIO si = srvSide.get(sk);
+ if(!si.get()) {
+ stop();
+ si.close();
+ srvSide.remove(sk);
+ }
+ else {
+ log(5, "received " + prBuf(data.buf));
+ try { data.unload(); } catch(Exception x) { abend("data unload at forwarding", x); return; }
+ if(kicker) {
+ log(3, "received from " + (con.mash ? "mash: " : "ring: ") + data.toString());
+ if(data.dttl() <= 0) {
+ if(CS.isGui) con.cBox.ttl = data.ttl();
+ log(1, "TTL 0 reached, received " + data.toString() + ", leaving 'forward' closing all connections");
+ stop(); return;
+ }
+ try { data.load(); } catch(Exception x) { abend("data load at forwarding", x); return; }
+ }
+ if(con.glob.doForward) doForward();
+ }
+ log(4, "leaving 'forward'");
+ }
+ private void doForward() {
+ if((nextNode = chooseNextNode()) == 0) { stop(); return; }
+ log(3, "forwarding data to " + nextNode);
+ if(CS.pacing) pacing(nextNode);
+ if(CS.pace > 0) try { Thread.sleep(CS.pace); } catch(InterruptedException i) {};
+ if(!cliSide.get(nextNode).put()) stop();
+ }
+ private int chooseNextNode() {
+ int next;
+ if(con.mash) while((next = (con.first + CS.r.nextInt(con.nodes))) == thisNode);
+ else { next = thisNode + 1; if(next > con.last) next = con.first; }
+ if(cliSide.containsKey(next)) return next;
+ else return (conn(next) ? next : 0);
+ }
+ private void pacing(int nextNode) {
+ log(4, "constls.pacing");
+ synchronized(CS.constls) {
+ if(!con.glob.pacingGo) try { CS.constls.wait(); } catch(InterruptedException x) {}
+ if(nextNode > 0) {
+ con.cBox.currLink[0] = thisNode - con.first;
+ con.cBox.currLink[1] = nextNode - con.first;
+ }
+ con.cBox.ttl = data.ttl();
+ log(4, "constls.notify"); CS.constls.notify();
+ con.glob.pacingGo = false;
+ }
+ }
+ public void stop() {
+ con.glob.doForward = false;
+ synchronized(con) { con.notify(); }
+ closeNode();
+ }
+ private void abend(String msg, Exception x) {
+ abendMsg(msg, x);
+ con.glob.abend = true;
+ CS.isRun = false;
+ stop();
+ }
+ private void closeNode() {
+ if(!closing) new Thread(new closeClients(this)).start();
+ closing = true;
+ }
+ private class closeClients extends Debug implements Runnable {
+ closeClients(Debug deb) { this.debid = deb.debid + " clients CLOSE"; }
+ public void run() {
+ log(4, "starting...");
+ for(Iterator<Integer> i = cliSide.keySet().iterator(); i.hasNext();) {
+ int port = i.next();
+ log(4, "closing conn to " + port);
+ cliSide.get(port).close();
+ }
+ log(4, "end");
+ }
+ }
+ private void prepareSSL() throws Exception {
+ char[] passphrase = "passphrase".toCharArray();
+ KeyStore ks = KeyStore.getInstance("JKS");
+ String ksFile = (System.getenv("KSF") == null) ? CS.cePath + "/testkeys" : System.getenv("KSF");
+ FileInputStream kfs = new FileInputStream(ksFile);
+ ks.load(kfs, passphrase);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, passphrase);
+ KeyStore ts = KeyStore.getInstance("JKS");
+ FileInputStream tfs = new FileInputStream(ksFile);
+ ts.load(tfs, passphrase);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+ tmf.init(ts);
+ sslC = SSLContext.getInstance("TLS");
+ sslC.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ kfs.close();
+ tfs.close();
+ }
+ private void bind() throws IOException {
+ log(5, "binding...");
+ selector = Selector.open();
+ ssc = ServerSocketChannel.open();
+ ssc.configureBlocking(false);
+ ssc.socket().bind(new InetSocketAddress(thisNode), 1024);
+ ssk = ssc.register(selector, SelectionKey.OP_ACCEPT);
+ log(2, "bound to " + thisNode);
+ }
+ private boolean conn(int remPort) {
+ int retry = CS.connThreshold;
+ boolean connected = false;
+ SocketChannel sc = null;
+ log(4, "connecting to " + remPort + ", timeout=" + (CS.connThreshold*CS.connTO)/1000 + " secs. ...");
+ while(!connected && retry-- > 0 && con.glob.doForward) {
+ try {
+ sc = SocketChannel.open(new InetSocketAddress("localhost", remPort));
+ connected = true;
+ } catch(Exception x) {
+ if(x.getMessage().equals("Connection refused")) {
+ try { Thread.sleep(CS.connTO); } catch(InterruptedException i) {}
+ } else { abendMsg("connection to " + remPort, x); return false; }
+ }
+ }
+ if(!con.glob.doForward) return false;
+ if(connected) {
+ try { cliSide.put(remPort, new SockIO(this, sc, false));
+ } catch(Exception x) { abendMsg("connection to " + remPort, x); return false; }
+ log(2, "connected after " + (CS.connThreshold - retry + 1) + " retries");
+ return true;
+ }
+ else { abendMsg("connection to " + remPort + " timeout", null); return false; }
+ }
+ private void acc() throws Exception {
+ try {
+ SocketChannel sc = ssc.accept();
+ sc.configureBlocking(false);
+ SockIO si = new SockIO(this, sc, true);
+ SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
+ srvSide.put(sk, si);
+ CS.connCnt++;
+ log(2, "connection accepted");
+ } catch(Exception x) { abend("connection accept", x); throw new Exception(x); }
+ }
+ private class SockIO extends Debug {
+ SocketChannel sc;
+ Selector handshakeSelector;
+ SelectionKey sk;
+ Runnable ru;
+ SSLSession session;
+ SSLEngine e;
+ SSLEngineResult r;
+ ByteBuffer ci, co, ib;
+ boolean wrapper;
+ SockIO(Debug deb, SocketChannel sc, boolean server) throws Exception {
+ this.debid = deb.debid + " socket " + (server ? "(server)" : "(client)");
+ log(5, "initializing...");
+ this.sc = sc;
+ if(con.ssl) {
+ handshakeSelector = Selector.open();
+ sc.configureBlocking(false);
+ sk = sc.register(handshakeSelector, SelectionKey.OP_READ);
+ e = sslC.createSSLEngine();
+ session = e.getSession();
+ int am = session.getApplicationBufferSize();
+ int pm = session.getPacketBufferSize();
+ ib = ByteBuffer.allocate(am); // pišvejcova konstanta
+ co = ByteBuffer.allocateDirect(pm);
+ ci = ByteBuffer.allocateDirect(pm);
+ if(server) {
+ e.setUseClientMode(false);
+ e.setNeedClientAuth(true);
+ } else e.setUseClientMode(true);
+ }
+ log(4, "initialized");
+ }
+ String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit() + "/" + b.capacity(); }
+ private String eStat() {
+ String stat;
+ if (r != null)
+ stat = r.getStatus() + "/" + r.getHandshakeStatus() + "/" + e.getHandshakeStatus() +
+ ", bytes: " + r.bytesConsumed() + "/" + r.bytesProduced();
+ else stat = "-/-/" + e.getHandshakeStatus() + " -/-";
+ return stat;
+ }
+ //-- result status
+ private boolean isOK() {
+ return r.getStatus() == SSLEngineResult.Status.OK; }
+ private boolean isClosed() {
+ return r.getStatus() == SSLEngineResult.Status.CLOSED; }
+ private boolean isBad() {
+ return r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW; }
+ //-- result handshake status
+ private boolean handShake() {
+ return r.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; }
+ private boolean handShakeEnd() {
+ return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED; }
+ private boolean needTask() {
+ return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK; }
+ //-- engine handshake status
+ private boolean needUnwrap() {
+ return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP; }
+ private boolean needWrap() {
+ return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP; }
+ private int read(SocketChannel sc, ByteBuffer b) {
+ int n = -1, k = 0;
+ log(5, "read, " + prBuf(b) + " ...");
+ if(con.ssl) {
+ try { while((k = handshakeSelector.select(CS.selTO)) == 0) { if(!con.glob.doForward) break; }
+ } catch (Exception x) { abendMsg("handshake select", x); k = 0; }
+ if(k>0 && sk.isReadable()) {
+ try { n = sc.read(b); } catch(Exception x) { abendMsg("socket channel read", x); n = -1; }
+ handshakeSelector.selectedKeys().remove(sk);
+ }
+ } else
+ try { while(b.hasRemaining()) { n += sc.read(b); if(n < 0) break; }; n++;
+ } catch(Exception x) { abendMsg("socket channel read", x); n = -1; };
+ log(4, "read " + n);
+ return n;
+ }
+ private int write(SocketChannel sc, ByteBuffer b) {
+ log(5, "writing " + prBuf(b) + " ...");
+ int n = 0;
+ try { n = sc.write(b); } catch(Exception x) { abendMsg("socket channel write", x); n = -1; }
+ log(4, "written " + n);
+ return n;
+ }
+ private boolean replenish() {
+ log(5, "replenishing ci...");
+ ci.clear();
+ int n = read(sc, ci);
+ ci.flip();
+ return (n >= 0);
+
+ }
+ void handleUnWrapStatus() {
+ ByteBuffer b;
+ if(r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW) {
+ log(5, "unwrap: ib BUFFER_OVERFLOW " + prBuf(ib));
+ if(ib.position() > 0) { ib.flip(); data.buf.put(ib); ib.clear(); }
+ else {
+ b = ByteBuffer.allocate((int)(1.25 * ib.capacity()));
+ ib.flip(); b.put(ib); ib = b; }
+ log(5, "ib " + prBuf(ib));
+ }
+ if(r.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
+ int n;
+ log(5, "unwrap: ci BUFFER_UNDERFLOW " + prBuf(ci));
+ if(ci.limit() < ci.capacity()) {
+ ci.mark(); ci.position(ci.limit()); ci.limit(ci.capacity());
+ n = read(sc, ci); ci.limit(ci.position()); ci.reset(); }
+ else {
+ n = ci.capacity(); if(ci.position() == 0) n *= 2;
+ b = ByteBuffer.allocate(n); b.put(ci); ci = b;
+ n = read(sc, ci); ci.flip(); }
+ log(5, "additional " + n + " bytes read: " + prBuf(ci)); }
+ }
+ boolean doWrap() {
+ log(5, "entering " + "wrap, ob: " + prBuf(data.buf) + ", co: " + prBuf(co) + " ...");
+ try { r = e.wrap(data.buf, co); } catch (Exception x) { abend("SSL engine wrap", x); return false; }
+ //if(isBad()) { con.glob.abend = true; return abendMsg("SSL engine status: " + r.getStatus().toString(), null); }
+ if(isBad()) { abend("SSL engine status: " + r.getStatus().toString(), null); return false; }
+ log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " wrap: " + eStat() + ", ob: " + prBuf(data.buf) + ", co: " + prBuf(co));
+ if(isClosed()) return false;
+ return true;
+ }
+ private boolean wrap() {
+ do {co.clear();
+ if(!doWrap()) return false;
+ if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run();
+ co.flip();
+ if(handShake() && (write(sc, co) == -1)) return false;
+ } while (needWrap());
+ if(needUnwrap() && !( replenish() && unwrap() )) return false;
+ log(5, (wrapper ? "wrapper" : "unwrapper") + " wrap HS finished=" + handShakeEnd());
+ if(wrapper && handShakeEnd()) {
+ co.clear();
+ if(!doWrap()) return false;
+ co.flip();
+ }
+ return true;
+ }
+ boolean doUnwrap() {
+ log(5, "entering " + "unwrap, ib: " + prBuf(ib) + ", ci: " + prBuf(ci) + " ...");
+ try { r = e.unwrap(ci, ib); } catch (Exception x) { abend("SSL engine unwrap", x); return false; }
+ log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " unwrap: " + eStat() + ", ib: " + prBuf(ib) + ", ci: " + prBuf(ci));
+ if(isClosed()) return false;
+ return true;
+ }
+ private boolean unwrap() {
+ do { // while( needUnwrap() )
+ do { // while( needUnwrap() && ci.hasRemaining() )
+ if(!doUnwrap()) return false;
+ if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run();
+ } while(needUnwrap() && ci.hasRemaining());
+ if(needUnwrap() && !replenish()) return false;
+ } while(needUnwrap());
+ if(needWrap() && !wrap()) return false;
+ if(!wrapper) {
+ if(handShakeEnd() && !replenish()) return false;
+ if(handShakeEnd() || !handShake()) {
+ handleUnWrapStatus();
+ while(ci.hasRemaining()) {
+ do {
+ if(!doUnwrap()) return false;
+ handleUnWrapStatus();
+ } while(!isOK());
+ }
+ }
+ }
+ return true;
+ }
+ boolean get() {
+ wrapper = false;
+ boolean got = false;
+ data.buf.clear();
+ if(con.ssl) {
+ while(data.buf.hasRemaining()) {
+ ib.clear();
+ int n;
+ ci.clear(); n = read(sc, ci); ci.flip();
+ if(n < 0) { got = false; break; }
+ else got = (unwrap() && !isClosed());
+ if(got) { ib.flip(); data.buf.put(ib); log(4, "partially got " + prBuf(data.buf)); }
+ else break;
+ }
+ }
+ else try { got = (read(sc, data.buf) >= 0); } catch(Exception x) { got = abendMsg("socket read", x); }
+ return got;
+ }
+ boolean put() {
+ wrapper = true;
+ boolean put = false;
+ log(4, "put: ob: " + prBuf(data.buf));
+ try {
+ do {
+ if(con.ssl) {
+ if(!wrap() || isClosed()) return false;
+ else put = (write(sc, co) >= 0);
+ } else put = (write(sc, data.buf) >= 0);
+ } while(put && data.buf.hasRemaining());
+ if(put) CS.forwCnt++;
+ } catch(Exception x) { put = abendMsg("socket write", x); }
+ return put;
+ }
+ void close() {
+ log(4, "terminating connection...");
+ try {
+ if(con.ssl) {
+ data.buf.put(ByteBuffer.wrap("".getBytes()));
+ e.closeOutbound();
+ wrap();
+ }
+ sc.close();
+ } catch(Exception x) {}
+ }
+ }
+}
+class Constellation extends Debug implements Runnable {
+ class Glob {
+ boolean doForward = true;
+ boolean pacingGo = false;
+ boolean stop = false;
+ boolean abend = false;
+ int D = 0;
+ int spawned = 0;
+ int from = 0, to = 0;
+ }
+ volatile Glob glob;
+ boolean mash, ssl;
+ int first, last, nodes;
+ Data data;
+ CS cs;
+ Gui.CBox cBox;
+ String label;
+ Constellation(CS cs, Gui.CBox cBox, boolean mash, boolean ssl) {
+ label = (ssl ? "" : "non") + "SSL " + (mash ? "MASH" : "RING");
+ debid = "DEMO " + label;
+ this.cs = cs;
+ this.cBox = cBox;
+ this.mash = mash;
+ this.ssl = ssl;
+ if(mash) { first = CS.mp0; nodes = CS.mn; }
+ else { first = CS.rp0; nodes = CS.rn; }
+ glob = new Glob();
+ }
+ Constellation(Constellation con) {
+ super(con);
+ this.cs = con.cs;
+ this.mash = con.mash;
+ this.ssl = con.ssl;
+ this.first = con.first;
+ this.last = con.last;
+ this.nodes = con.nodes;
+ this.glob = con.glob;
+ }
+ void stop() { glob.stop = true; }
+ //void reset() {
+ //first = mash ? CS.mp0 : CS.rp0;
+ //first += nodes;
+ //glob.doForward = true;
+ //glob.abend = false;
+ //glob.spawned = 0;
+ //}
+ public void run() {
+ log(4, "starting");
+ glob.pacingGo = true;
+ runNodes();
+ synchronized(CS.constls) { CS.constls.notify(); }
+ synchronized(cs) {
+ if(--CS.notFinished == 0) cs.notifyAll();
+ else try { cs.wait(); } catch(InterruptedException e) {};
+ }
+ log(1, glob.abend ? "BAD: constellation not finished correctly" : "OK, constellation finished correctly");
+ if(glob.abend) CS.abend = true;
+ CS.spawned--;
+ synchronized(cs) { cs.notify(); }
+ }
+ void runNodes() {
+ log(1, nodes + " nodes constellation, ttl=" + CS.pttl + " starting...");
+ first += (ssl ? 500 : 0);
+ last = first + nodes - 1;
+ if(data == null)
+ try { data = new Data(this, CS.text, CS.pttl);
+ } catch(Exception x) { abendMsg("creating initial data", x); return; }
+ synchronized(glob) {
+ for(int port = first; (port < first + nodes); port++) {
+ try {
+ new Thread(new Node(this, port), "Node " + port).start();
+ log(3, "node " + port + " established");
+ glob.spawned++;
+ } catch(Exception x) { glob.doForward = false; glob.abend = true; break; }
+ }
+ if(glob.doForward) log(2, "all nodes established");
+ while(glob.spawned > 0) try { glob.wait(); } catch(InterruptedException e) {};
+ }
+ log(2, "all nodes finished");
+ }
+}
+class Gui extends Debug implements Runnable {
+ class Parms extends JPanel {
+ static final long serialVersionUID = 43;
+ class Parm implements ActionListener {
+ JComboBox<Number> valueEntry;
+ JLabel valueLabel;
+ Parm(Number[] values, Number value, String label, boolean editable, boolean rowEnd) {
+ log(5, "parm " + label);
+ valueLabel = new JLabel(label);
+ valueLabel.setBorder(b);
+ if(orientation == HORIZONTAL) gridC.gridwidth = 1;
+ else gridC.gridwidth = GridBagConstraints.REMAINDER;
+ gridL.setConstraints(valueLabel, gridC);
+ add(valueLabel);
+ valueEntry = new JComboBox<Number>(values);
+ valueEntry.setPreferredSize(new Dimension(prefComboWidth, prefComboHeight));
+ if(value != null) valueEntry.setSelectedItem(value);
+ valueEntry.setEditable(editable);
+ valueEntry.addActionListener(this);
+ if(orientation == HORIZONTAL && rowEnd) gridC.gridwidth = GridBagConstraints.REMAINDER;
+ gridL.setConstraints(valueEntry, gridC);
+ add(valueEntry);
+ }
+ public void actionPerformed(ActionEvent e) {
+ try { setVal((Number)((JComboBox)e.getSource()).getSelectedItem()); } catch(Exception x) { log(0, x.getMessage()); }
+ }
+ void getEnv() {}
+ void setVal(Number v) {}
+ }
+ class Buttons extends Box {
+ class GoButton extends JButton implements ActionListener {
+ static final long serialVersionUID = 44;
+ GoButton() {
+ super("go");
+ addActionListener(this);
+ gridC.gridwidth = 1;
+ gridL.setConstraints(this, gridC);
+ CS.go = true;
+ }
+ public void actionPerformed(ActionEvent e) {
+ log(5, getText() + " button pressed");
+ if(getText().equals("go")) { setText("pause"); CS.go = true; CS.pacing = true; CS.isRun = true; awake(); }
+ else { setText("go"); CS.go = false; }
+ }
+ }
+ class StepButton extends JButton implements ActionListener {
+ static final long serialVersionUID = 44;
+ StepButton() {
+ super("step");
+ addActionListener(this);
+ gridC.gridwidth = 1;
+ gridL.setConstraints(this, gridC);
+ }
+ public void actionPerformed(ActionEvent e) {
+ log(5, getText() + " button pressed");
+ CS.go = false; CS.pacing = true; CS.isRun = true;
+ setGo();
+ awake();
+ }
+ }
+ class ResetButton extends JButton implements ActionListener {
+ static final long serialVersionUID = 45;
+ ResetButton() {
+ super("reset");
+ addActionListener(this);
+ gridC.gridwidth = 1;
+ gridL.setConstraints(this, gridC);
+ }
+ public void actionPerformed(ActionEvent e) { CS.isRun = false; CS.isReset = true; CS.go = true; awake(); }
+ }
+ class StopButton extends JButton implements ActionListener {
+ static final long serialVersionUID = 46;
+ StopButton() {
+ super("end");
+ addActionListener(this);
+ gridC.gridwidth = 1;
+ //gridC.gridwidth = GridBagConstraints.REMAINDER;
+ gridL.setConstraints(this, gridC);
+ }
+ public void actionPerformed(ActionEvent e) { closeUI(); }
+ }
+ GoButton go;
+ ResetButton reset;
+ StepButton step;
+ StopButton stop;
+ Box row1, row2;
+ Buttons() {
+ super(BoxLayout.Y_AXIS);
+ setBorder(b);
+ add(row1 = new Box(BoxLayout.X_AXIS));
+ add(row2 = new Box(BoxLayout.X_AXIS));
+ row1.add(go = new GoButton());
+ row1.add(step = new StepButton());
+ row2.add(reset = new ResetButton());
+ row2.add(stop = new StopButton());
+ }
+ void setGo() { go.setText("go"); }
+ void setPause() { go.setText("pause"); }
+ void enableStep(boolean b) { step.setEnabled(b); }
+ }
+ static final boolean EDITABLE = true;
+ static final boolean ROW_END = true;
+ boolean orientation;
+ EmptyBorder b = new EmptyBorder(0,7,0,7);
+ GridBagLayout gridL = new GridBagLayout();
+ GridBagConstraints gridC = new GridBagConstraints();
+ int prefComboWidth, prefComboHeight;
+ Buttons buttons;
+ Parms(boolean orientation) {
+ textHeight = (int)Math.round(1.5 * getFontMetrics(getFont()).getHeight());
+ prefComboWidth = (int)Math.round(1.5 * getFontMetrics(getFont()).bytesWidth("000000".getBytes(), 0, 6));
+ prefComboHeight = textHeight;
+ this.orientation = orientation;
+ gridC.fill = GridBagConstraints.BOTH;
+ setFont(new Font("SansSerif", Font.PLAIN, 9));
+ setLayout(gridL);
+ new Parm(new Integer[] {0,1,2,3,4,5,7,9}, new Integer(CS.debug), "Debug level", !EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.debug = v.intValue(); } };
+ new Parm(new Integer[] {0,1,2}, new Integer(CS.issl), "SSL", !EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.issl = v.intValue(); } };
+ new Parm(new Integer[] {CS.pttl}, null, "TTL", EDITABLE, ROW_END) {
+ void setVal(Number v) { CS.pttl = v.intValue(); } };
+ new Parm(new Double[] {(double)CS.ipace/1000}, null, "pace in secs.", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.ipace = (int)(1000.0 * v.doubleValue()); CS.pace = CS.ipace; } };
+ new Parm(new Integer[] {CS.mp0}, null, "listen port of first MASH node", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.mp0 = v.intValue(); } };
+ new Parm(new Integer[] {CS.rp0}, null, "listen port of first RING node", EDITABLE, ROW_END) {
+ void setVal(Number v) { CS.rp0 = v.intValue(); } };
+ new Parm(new Integer[] {CS.mn}, null, "MASH constellation size", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.mn = v.intValue(); } };
+ new Parm(new Integer[] {CS.rn}, null, "RING constellation size", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.rn = v.intValue(); } };
+ new Parm(new Double[] {(double)CS.selTO/1000}, null, "I/O selection timeout in secs.", EDITABLE, ROW_END) {
+ void setVal(Number v) { CS.selTO = 1000 * v.intValue(); } };
+ new Parm(new Integer[] {CS.fake}, null, "point of faked exception (integer)", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.fake = v.intValue(); } };
+ new Parm(new Integer[] {CS.rs}, null, "random seed (integer)", EDITABLE, !ROW_END) {
+ void setVal(Number v) { CS.rs = v.intValue(); } };
+ //gridC.gridwidth = 0;
+ gridC.gridwidth = GridBagConstraints.REMAINDER;
+ add(buttons = new Buttons());
+ }
+ }
+ class CBoxBg extends BufferedImage {
+ final int nodeC[][]; // node centers
+ CBoxBg(int n) {
+ super(cBoxSize , cBoxSize, BufferedImage.TYPE_INT_RGB);
+ nodeC = new int[n][2];
+ log(5, "cnstlltn bg image beg, node centers array length=" + nodeC.length);
+ final double a0 = Math.PI / 2;
+ final double aN = 2 * Math.PI / n;
+ final int b = 3; // border
+ final int r = 5; // node diameter
+ int cx, cy; // constellation center coordinates
+ cx = cy = cBoxSize/2;
+ int R = cBoxSize/2 - r - 2*b; // distance of node centers from constellation center
+ int dx, dy; // deltas of node center coordinates
+ final Graphics2D g2 = (Graphics2D)this.getGraphics();
+ g2.setBackground(Color.WHITE);
+ g2.clearRect(0, 0, cBoxSize, cBoxSize);
+ g2.setColor(Color.BLACK);
+ g2.draw3DRect(b, b, cBoxSize - 2*b, cBoxSize - 2*b, true);
+ if(n < 2) return;
+ for(int i=0; i<n; i++) {
+ dx = (int)Math.round(Math.cos(a0 + i * aN) * R);
+ dy = (int)Math.round(Math.sin(a0 + i * aN) * R);
+ nodeC[i][0] = dx; nodeC[i][1] = dy;
+ g2.drawOval(cx-dx-r, cy-dy-r, 2*r, 2*r);
+ }
+ log(5, "cnstlltn bg image end, node centers array length=" + nodeC.length);
+ }
+ }
+ class Arrow extends Polygon {
+ final double z, D, sin, cos, xd, yd, dx, dy;
+ final int x0, y0, x3, y3;
+ Arrow(int x1, int y1, int x2, int y2, int d) {
+ super();
+ z=d/(2*1.618034);
+ D = Math.sqrt(Math.pow(x2-x1, 2) + Math.pow(y2-y1, 2));
+ sin = (x2-x1)/D;
+ cos = (y2-y1)/D;
+ xd = x2-d*sin;
+ yd = y2-d*cos;
+ dx = z*cos;
+ dy = z*sin;
+ x0 = (int)Math.round(xd-dx);
+ y0 = (int)Math.round(yd+dy);
+ x3 = (int)Math.round(xd+dx);
+ y3 = (int)Math.round(yd-dy);
+ addPoint(x0, y0);
+ addPoint(x2, y2);
+ addPoint(x3, y3);
+ }
+ }
+ class CBox extends Box {
+ static final long serialVersionUID = 45;
+ class CHead extends JPanel {
+ final JLabel field = new JLabel();
+ CHead() {
+ setPreferredSize(new Dimension(cBoxSize, textHeight - 4));
+ add(field);
+ }
+ public void paint(Graphics g) {
+ super.paint(g);
+ field.setText(label + ttl);
+ }
+ }
+ class CPanel extends JPanel {
+ CPanel() { setPreferredSize(new Dimension(cBoxSize, cBoxSize)); }
+ public void paint(Graphics g) {
+ super.paint(g);
+ final Graphics2D g2 = (Graphics2D)g;
+ Polygon p;
+ g2.drawImage(bg, 0, 0, Color.WHITE, null);
+ final int n1 = currLink[0], n2 = currLink[1];
+ if(n1 > -1) {
+ final int x1 = cx-bg.nodeC[n1][0], y1 = cy-bg.nodeC[n1][1];
+ final int x2 = cx-bg.nodeC[n2][0], y2 = cy-bg.nodeC[n2][1];
+ log(5, label + " paint, n1=" + n1 + ", n2=" + n2 + ", nodeC.length=" + bg.nodeC.length);
+ g2.setColor(Color.CYAN);
+ g2.setStroke(new BasicStroke(2));
+ g2.drawLine(x1, y1, x2, y2);
+ g2.setColor(Color.BLUE);
+ g2.setStroke(new BasicStroke(0));
+ g2.drawPolygon(p = new Arrow(x1, y1, x2, y2, 12));
+ g2.fill(p);
+ }
+ }
+ }
+ volatile int[] currLink = {-1,-1};
+ volatile int ttl;
+ CBoxBg bg;
+ final int cx = cBoxSize / 2, cy = cx;
+ final String label;
+ CBox(String label, CBoxBg bg) {
+ super(BoxLayout.Y_AXIS);
+ setMaximumSize(new Dimension(cBoxSize, cBoxSize + textHeight));
+ this.bg = bg;
+ this.label = label + ", ttl=";
+ ttl = CS.pttl;
+ add(new CHead());
+ add(new CPanel());
+ }
+ void reset(CBoxBg bg) { currLink[0] = -1; currLink[1] = -1; ttl = CS.pttl; this.bg = bg; }
+ }
+ class CcBox extends Box {
+ CBox ringBox, mashBox;
+ CcBox(String label) {
+ super(BoxLayout.X_AXIS);
+ add(mashBox = new CBox("MASH " + label + " SSL", mashBg));
+ add(ringBox = new CBox("RING " + label + " SSL", ringBg));
+ }
+ }
+ CS cs;
+ JFrame ui = new JFrame(debid);
+ Container dashboard;
+ static final boolean HORIZONTAL = true;
+ static final boolean VERTICAL = false;
+ boolean dashboardLayout = HORIZONTAL;
+ int textHeight;
+ Parms parms;
+ Box resultBox = null;
+ CcBox sslBox = null, nonSslBox = null;
+ CBoxBg ringBg = null, mashBg = null;
+ CBox nonSslMashBox, nonSslRingBox, sslMashBox, sslRingBox;
+ int cBoxSize;
+ WindowAdapter uiLstnr = new WindowAdapter() {
+ public void windowOpened(WindowEvent e) { log(5, "window opened"); }
+ public void windowClosing(WindowEvent e) { closeUI(); }
+ public void windowClosed(WindowEvent e) { log(5, "window closed"); synchronized(CS.gui) { CS.gui.notify(); }
+ }
+ };
+ Gui(CS cs) {
+ super(cs);
+ this.cs = cs;
+ debid = cs.debid + " GUI";
+ log(5, "start parms panel");
+ ui.setDefaultCloseOperation(JFrame.DO_NOTHING_ON_CLOSE);
+ ui.addWindowListener(uiLstnr);
+ ui.setLocation(600, 100);
+ dashboard = ui.getContentPane();
+ dashboard.setFont(new Font("SansSerif", Font.PLAIN, 9));
+ dashboard.setLayout(new BoxLayout(dashboard, dashboardLayout == HORIZONTAL ? BoxLayout.X_AXIS : BoxLayout.Y_AXIS));
+ ui.add(parms = new Parms(!dashboardLayout));
+ ui.pack();
+ }
+ public void run() {
+ log(5, "repaint");
+ ui.setVisible(true);
+ ui.repaint();
+ }
+ void cboxes() {
+ log(5, "create constellation panels");
+ cBoxSize = (dashboardLayout == VERTICAL ? dashboard.getSize().width : dashboard.getSize().height)/2 - textHeight;
+ ringBg = new CBoxBg(CS.rn);
+ mashBg = new CBoxBg(CS.mn);
+ if(resultBox != null) dashboard.remove(resultBox);
+ dashboard.add(resultBox = new Box(BoxLayout.Y_AXIS));
+ if(CS.issl > 0) {
+ resultBox.add(sslBox = new CcBox("w/"));
+ sslMashBox = sslBox.mashBox;
+ sslRingBox = sslBox.ringBox;
+ }
+ if(CS.issl != 1) {
+ resultBox.add(nonSslBox = new CcBox("non"));
+ nonSslMashBox = nonSslBox.mashBox;
+ nonSslRingBox = nonSslBox.ringBox;
+ }
+ ui.pack();
+ }
+ void awake() { synchronized(CS.gui) { CS.gui.notify(); } }
+ void dashboardReset() {
+ //parms.buttons.setGo();
+ parms.buttons.enableStep(true);
+ }
+ void closeUI() {
+ log(5, "closing window");
+ CS.isGui = false; CS.isRun = false; CS.go = true; awake();
+ }
+}
+public class CS extends Debug {
+ volatile static int
+ connCnt = 0,
+ forwCnt = 0,
+ notFinished = 0,
+ spawned = 0;
+ volatile static boolean abend = false;
+ static String text = "bla bla";
+ static String clsPath;
+ static String cePath;
+ static final String sslPathSuffP = "../CS";
+ static Random r;
+ static int
+ mn = 0,
+ mp0 = 11000,
+ rn = 0,
+ rp0 = 12000,
+ pttl = 3,
+ issl = 0,
+ connThreshold = 77, // connection retries threshold
+ connTO = 99, // connection sleep time in msecs
+ selTO = 999, // selection timeout in msecs
+ ipace = 0,
+ pace = 0,
+ rs = 0,
+ fake = 0;
+ static boolean isGui = false, isRun = true, isReset = false, go = true, pacing = false;
+ static ArrayList<Constellation> constls;
+ public static Constellation nonSslMashCon, nonSslRingCon, sslMashCon, sslRingCon;
+ static Gui gui;
+ static Gui.CBox nonSslMashBox, nonSslRingBox, sslMashBox, sslRingBox;
+ CS() {
+ debid = "client server DEMO";
+ getArgs();
+ }
+ boolean isArg(String a) { return System.getenv(a) != null; }
+ int getArgI(String a) throws Exception {
+ int i = -1;
+ if(System.getenv(a) != null)
+ if(!System.getenv(a).equals(""))
+ try { i = Integer.valueOf(System.getenv(a));
+ } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); }
+ return i;
+ }
+ double getArgF(String a) throws Exception {
+ double d = -1;
+ if(System.getenv(a) != null)
+ if(!System.getenv(a).equals(""))
+ try { d = Double.valueOf(System.getenv(a));
+ } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); }
+ return d;
+ }
+ int getMArg(String a) throws Exception {
+ int i;
+ if((i = getArgI(a)) == 0) throw new Exception(a + " is mandatory");
+ return i;
+ }
+ void getArgs() {
+ try {
+ clsPath = getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ if(getArgI("DEB") >= 0) debug = getArgI("DEB");
+ if(System.getenv("T") != null) text = System.getenv("T");
+ if(getArgI("TTL") > 0) pttl = getArgI("TTL");
+ if(getArgF("P") >= 0) ipace = (int)(getArgF("P") * 1000);
+ if(getArgI("MP0") > 0) mp0 = getArgI("MP0");
+ if(getArgI("RP0") > 0) rp0 = getArgI("RP0");
+ if(getArgI("N") >= 0) { mn = getArgI("N"); rn = mn; }
+ if(getArgI("SSL") >= 0) issl = getArgI("SSL");
+ if(System.getenv("CEP") != null) cePath = System.getenv("CEP");
+ else cePath = clsPath + sslPathSuffP;
+ if(getArgI("MN") >= 0) mn = getArgI("MN");
+ if(getArgI("RN") >= 0) rn = getArgI("RN");
+ if(getArgF("STO") >= 0) selTO = (int)(getArgF("STO") * 1000);
+ if(getArgI("FAKE") >= 0) fake = getArgI("FAKE");
+ if(isArg("RS")) rs = getArgI("RS");
+ if(isArg("G")) isGui = getArgI("G") == 1;
+ } catch(Exception x) { log(0, x.getMessage()); return; }
+ }
+ void dashboard() {
+ gui = new Gui(this);
+ log(4, "wait for args from GUI");
+ synchronized(gui) {
+ try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); }
+ try { gui.wait(); } catch(InterruptedException x) {}
+ }
+ if(isGui) cboxes();
+ }
+ void cboxes() {
+ log(4, "cboxes");
+ try { gui.cboxes(); } catch(Exception x) { log(0, x.getMessage()); }
+ try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); }
+ nonSslMashBox = gui.nonSslMashBox;
+ nonSslRingBox = gui.nonSslRingBox;
+ sslMashBox = gui.sslMashBox;
+ sslRingBox = gui.sslRingBox;
+
+ }
+ void constellations() {
+ pace = ipace;
+ spawned = 0; notFinished = 0;
+ constls = new ArrayList<Constellation>();
+ if(mn == 1) log(0, "one-node MASH configuration not implemented");
+ if(mn > 1) {
+ if(issl > 0) { sslMashCon = new Constellation(this, sslMashBox, true, true); constls.add(sslMashCon); }
+ if(issl != 1) { nonSslMashCon = new Constellation(this, nonSslMashBox, true, false); constls.add(nonSslMashCon); }
+ }
+ if(rn == 1) log(0, "one-node RING configuration not implemented");
+ if(rn > 1) {
+ if(issl > 0) { sslRingCon = new Constellation(this, sslRingBox, false, true); constls.add(sslRingCon); }
+ if(issl != 1) { nonSslRingCon = new Constellation(this, nonSslRingBox, false, false); constls.add(nonSslRingCon); }
+ }
+ }
+ void stop() {
+ if(sslMashCon != null) sslMashCon.stop();
+ if(sslRingCon != null) sslRingCon.stop();
+ if(nonSslMashCon != null) nonSslMashCon.stop();
+ if(nonSslRingCon != null) nonSslRingCon.stop();
+ pacing = false;
+ pace = 0;
+ go = true;
+ }
+ void reset() {
+ gui.dashboardReset();
+ cboxes();
+ mp0 += mn; rp0 += rn; // port is unusable 30 secs after port close due to special timeout
+ constellations();
+ isReset = false;
+ }
+ String switches(String label) {
+ return label + ": isGui=" + isGui + ", isRun=" + isRun + ", go=" + go + ", pacing=" + pacing + ", spawned=" + spawned;
+ }
+ void pacingGo() { for(Constellation con : constls) con.glob.pacingGo = true; }
+ void runGuiCon() {
+ synchronized(constls) {
+ while(isGui && isRun && spawned > 0) {
+ log(4, switches("runCons constls.wait"));
+ try { constls.wait(); } catch(InterruptedException x) {}
+ pacingGo();
+ log(4, switches("runCons constls.paint"));
+ if(!isGui) break;
+ try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); }
+ if(!isGui || !isRun) break;
+ if(!go) {
+ log(4, switches("runCons constls.gui.wait"));
+ synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} }
+ }
+ if(!isGui || !isRun) break;
+ log(4, switches("runCons constls.notifyAll"));
+ constls.notifyAll();
+ }
+ }
+ if(isGui && isRun) {
+ log(4, switches("runCons last repaint"));
+ try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); }
+ if(!go) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} }
+ }
+ else {
+ log(4, switches("runCons stop"));
+ stop();
+ synchronized(constls) { constls.notifyAll(); }
+ while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} }
+ }
+ }
+ void runCons() {
+ if(isGui) pacing = true;
+ else pacing = false;
+ for(Constellation con : constls) { notFinished++; new Thread(con, con.label).start(); spawned++; }
+ if(isGui) runGuiCon();
+ else while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} }
+ }
+ public void run() {
+ if(isGui) dashboard();
+ if(isRun) {
+ log(1, "pgm=" + clsPath + getClass().getName() +
+ ", ttl=" + pttl + ", pace=" + ipace + "msecs, seed=" + rs + ", SSL=" + issl + ", fake=" + fake + ", debug=" + debug);
+ if(issl > 0) log(3, "cePath=" + cePath);
+ constellations();
+ if(!isGui) { r = new Random(rs); runCons(); }
+ else while(isGui) {
+ r = new Random(rs);
+ runCons();
+ log(1, "all constellations finished");
+ if(!isGui) break;
+ gui.parms.buttons.setGo();
+ if(!go) gui.parms.buttons.step.setEnabled(false);
+ if(abend) {
+ gui.parms.buttons.go.setEnabled(false);
+ gui.parms.buttons.step.setEnabled(false);
+ gui.parms.buttons.reset.setEnabled(false);
+ }
+ try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); }
+ if(!isReset) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} }
+ if(isGui) {
+ reset();
+ do synchronized(gui) {
+ try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); }
+ if(!isRun) try { gui.wait(); } catch(InterruptedException x) {}
+ if(isGui && isReset) reset();
+ } while(isGui && !isRun);
+ }
+ }
+ }
+ log(1, "final balance, connections=" + connCnt + ", forwards=" + forwCnt);
+ if(gui != null) gui.ui.dispose();
+ log(2, "run end");
+ }
+ public static void main(String[] args) throws Exception {
+ CS cs = new CS();
+ cs.run();
+ cs.log(1, "cs end");
+ //try { cs.run(); } catch(Exception x) { cs.log(0, "interrupted execution"); };
+ }
+}
+// rozeznání konce ve step-módu
+// pacing slide
+// input fields
+// ukládání parametrů
+// too many open files
+// exceptions
+// stavové zprávy na dashboardu
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSj/Makefile Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,2 @@
+all: CS.java
+ javac CS.java
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSp/CS.py Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,315 @@
+#!/usr/bin/python3
+# coding=utf8
+
+# ring nonssl: cca 8000 rings/sec*3nodes
+# ring ssl: cca 2600 rings/sec*3nodes
+# mash nonssl: cca 2000 mashes/sec*3nodes
+# mash ssl: cca 300 mashes/sec*3nodes
+# u mashe je čas na přenosy úměrný počtu uzlů,
+# kdežto čas na connect/close je úměrný počtu spojů tj. kvadrátu počtu uzlů
+
+import time
+import socket
+import errno
+import os
+import signal
+import sys
+import pickle
+import ssl
+import select
+import random
+import multiprocessing
+import threading
+
+
+class Debug():
+ def __init__(self, debid):
+ self.debid = debid
+ def log(self, level, *msg):
+ if level <= maxDebLev:
+ log_lock.acquire()
+ print("{:10.6f} {}:".format(time.time()-t0, self.debid), *msg, file=sys.stderr)
+ sys.stderr.flush()
+ log_lock.release()
+ def abend(self, s):
+ self.log(0, s)
+ os.kill(0, signal.SIGTERM)
+
+
+class Node(Debug):
+ def __init__(self, debid, forwarding, topo, port, p0, pn, issl):
+ Debug.__init__(self, "{} node {}".format(debid, port))
+ self.topo = topo
+ self.locPort = port
+ self.p0 = p0
+ self.pn = pn
+ self.ssc = None
+ self.cli_side = {}
+ self.srv_side = {}
+ self.kicker = (self.locPort == self.pn)
+ self.forwarding = forwarding # in forwarding kicker task indicates when TTL reached 0
+ self.closing = False
+ self.payload = None
+ self.issl = issl
+ if issl:
+ self.log(4, "setting SSL context...")
+ self.sslCert = cePath + "/certs/{}.pem".format(port)
+ self.sslKey = cePath + "/keys/{}.key".format(port)
+ try:
+ self.ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ self.ctx.verify_mode = ssl.CERT_REQUIRED
+ self.log(5, "cert={}, key={}".format(self.sslCert, self.sslKey))
+ self.ctx.load_cert_chain(self.sslCert, self.sslKey)
+ self.ctx.load_verify_locations(None, caPath)
+ except ssl.SSLError as e: self.abend("SSL context: {}".format(str(e)))
+ def run(self):
+ self.bind()
+ self.payload = Data(self.debid, self.locPort)
+ if self.kicker:
+ nxt = self.next_node()
+ self.log(1, "kicker ready to send '{}' to {}".format(self.payload.digest(), nxt))
+ if not nxt in self.cli_side: self.conn(nxt)
+ self.payload.put(self.cli_side[nxt])
+ if self.forwarding[0].value: wait_list = (self.ssc,)
+ while wait_list:
+ self.log(4, "select...")
+ self.log(5, "waitlist:", *(sc.fileno() for sc in wait_list))
+ ready_list = select.select(wait_list, (), (), sel_TO)
+ self.log(5, "readylist:", *(sc.fileno() for sc in ready_list[0]))
+ for sc in ready_list[0]:
+ self.log(5, "sc {} ready...".format(sc.fileno()))
+ if sc == self.ssc: sc = self.acc()
+ self.forward(sc)
+ wait_list = ()
+ self.log(4, "forwarding={}".format(self.forwarding[0].value))
+ if self.forwarding[0].value: wait_list += (self.ssc,) # when off, no new connection will come on ssc
+ else: self.close_cli()
+ for sc in self.srv_side.values(): wait_list += (sc,)
+ self.close_srv()
+ signal.signal(signal.SIGUSR2, sighand)
+ last = 0
+ ctr_lock.acquire()
+ active.value -= 1
+ if active.value == 0: last = 1
+ ctr_lock.release()
+ if last: os.kill(0, signal.SIGUSR2)
+ else: signal.pause()
+ self.log(2, "ended")
+ os._exit(0)
+ def forward(self, sc):
+ if not self.payload.get(sc):
+ self.log(5, "delete srv_side[{}]".format(sc.fileno()))
+ del self.srv_side[sc]
+ self.log(4, "closing {}...".format(sc.fileno()))
+ sc.close()
+ else:
+ self.log(5, "received data from {}".format(self.payload.rport))
+ ctr_lock.acquire();
+ forwards.value += 1;
+ ctr_lock.release()
+ if self.kicker:
+ self.payload.dttl()
+ self.log(3, "received from {}: {}, ttl={}".format(self.topo, self.payload.digest(), self.payload.ttl))
+ if self.payload.ttl <= 0:
+ self.log(1, "received after passing all {}: {}".\
+ format("mashes" if self.topo == "mash" else "rings", self.payload.digest()))
+ self.forwarding[0].value = 0
+ return
+ nxt = self.next_node()
+ self.log(5, "forwarding to {}...".format(nxt))
+ if pacing: time.sleep(pace)
+ if not nxt in self.cli_side: self.conn(nxt)
+ self.payload.put(self.cli_side[nxt])
+ self.log(5, "forwarded to {}".format(nxt))
+ def next_node(self):
+ if self.topo == "ring":
+ if self.kicker: nxt = self.p0
+ else: nxt = self.locPort + 1
+ else:
+ nxt = self.locPort
+ while nxt == self.locPort:
+ nxt = random.randint(self.p0, self.pn)
+ return nxt
+ def bind(self):
+ self.log(4, "binding...")
+ try:
+ self.ssc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.ssc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ except Exception as e:
+ self.abend("ssc alloc: {}".format(e.strerror))
+ if self.issl:
+ self.log(4, "ssc SSL wrap")
+ try: self.ssc = self.ctx.wrap_socket(self.ssc)
+ except ssl.SSLError as e: self.abend("ssc SSL wrap: {}".format(str(e)))
+ try:
+ self.ssc.bind(("127.0.0.1", self.locPort))
+ self.ssc.listen(1)
+ except Exception as e: self.abend("bind: {}".format(e.strerror))
+ self.log(2, "bound")
+ def conn(self, remPort):
+ self.log(4, "connecting to {}...".format(remPort))
+ try: sc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ except Exception as e: self.abend("socket alloc: {}".format(e.strerror))
+ if self.issl:
+ self.log(4, "sc SSL wrap")
+ try: sc = self.ctx.wrap_socket(sc)
+ except ssl.SSLError as e: self.abend("sc SSL wrap: {}".format(str(e)))
+ retry = connThreshold
+ connected = False
+ while not connected and retry > 0:
+ try:
+ sc.connect(("127.0.0.1", remPort))
+ connected = True
+ except Exception as e:
+ if e.errno == errno.ECONNREFUSED:
+ retry = retry - 1
+ time.sleep(conn_TO)
+ else: self.abend("connect: {}".format(str(e)))
+ if retry == 0: self.abend("connection refused, threshold {} reached".format(connThreshold))
+ ctr_lock.acquire();
+ connects.value += 1;
+ ctr_lock.release()
+ try: self.cli_side[remPort] = sc.makefile("wb")
+ except Exception as e: self.abend("client side makefile: {}".format(str(e)))
+ self.log(2, "connected to {} after {} retries".format(remPort, connThreshold - retry))
+ def acc(self):
+ self.log(4, "accepting...")
+ try:
+ ac = self.ssc.accept()
+ sc = ac[0]
+ except Exception as e: self.abend("accept: {}".format(str(e)))
+ try: sc = sc.makefile("rb")
+ except Exception as e: self.abend("srv side makefile: {}".format(str(e)))
+ self.srv_side[sc] = sc
+ self.log(2, "accepted on sc={}, addr={}".format(sc.fileno(), ac[1]))
+ return sc
+ def close_srv(self):
+ self.log(5, "closing ssc...")
+ if self.ssc: self.ssc.close()
+ def close_cli(self):
+ def do_close():
+ self.log(5, "closing clients...")
+ scs = self.cli_side.values()
+ self.cli_side.clear()
+ for sc in scs: sc.close()
+ self.log(4, "all clients closed")
+ if not self.closing:
+ threading.Thread(target = do_close, name = "client {} close".format(self.locPort)).start()
+ self.closing = True
+
+
+class Data(Debug):
+ def __init__(self, debid, port):
+ self.debid = debid + " payload"
+ self.clear()
+ self.ttl = ittl
+ self.lport = port
+ self.rport = 0
+ self.text = ""
+ def clear(self):
+ (self.ttl, self.rport, self.text) = 3 * (None,)
+ def put(self, sc):
+ self.log(5, "sending via {}...".format(sc.fileno()))
+ if self.ttl == None: self.ttl = ittl
+ if self.text == None: self.text = itext
+ try:
+ pickle.dump((self.ttl, self.lport, self.text), sc)
+ sc.flush()
+ except Exception as e: self.abend("send: {}".format(str(e)))
+ def get(self, sc):
+ self.log(5, "reading from {}...".format(sc.fileno()))
+ self.clear()
+ try:
+ (self.ttl, self.rport, self.text) = pickle.load(sc)
+ return True
+ except Exception as e:
+ if isinstance(e, EOFError): return False
+ else: self.abend("receive: {}".format(str(e)))
+ def dttl(self):
+ self.ttl -= 1
+ return self.ttl
+ def digest(self):
+ return self.text if len(self.text) < 24 else self.text[0:8]+"--------"+self.text[-8:]
+ def toString(self):
+ return "ttl={}, from port={}, text={}".\
+ format(str(self.ttl), str(self.rport), self.digest())
+
+class Constellation(Debug):
+ def __init__(self, issl, topo, p0, n):
+ Debug.__init__(self, "{}SSL {}".format("" if issl else "non", topo.upper()))
+ def run(self, issl, topo, p0, n):
+ signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+ forwarding = [multiprocessing.Value('i', 1, lock=False)] # list is passed by reference
+ if n == 1:
+ self.log(0, "one-node configuration is not implemented")
+ else:
+ self.log(1, "{} nodes starting...".format(n))
+ p0 += 500 if issl else 0
+ pn = p0 + n - 1
+ for port in range(p0, p0 + n):
+ pid = os.fork()
+ if not pid: Node(self.debid, forwarding, topo, port, p0, pn, issl).run()
+ else: self.log(4, "node {} started in process {}".format(port, pid))
+ self.log(2, "all nodes established")
+ while 1:
+ try:
+ p = os.wait()
+ if p[1] & 255:
+ self.log(4, "pid {} killed by {}".format(p[0], p[1] & 255))
+ else:
+ self.log(4, "pid {} returned {}".format(p[0], p[1] >> 8))
+ except: break
+ os._exit(0)
+
+def ga(key, default):
+ return os.environ[key] if key in os.environ else default
+def gi(key, default):
+ return int(ga(key, default))
+def sighand(signal, frame):
+ pass
+
+debug = Debug("client/server demo")
+log_lock = multiprocessing.Lock()
+ctr_lock = multiprocessing.Lock()
+forwards = multiprocessing.Value('i', 0)
+connects = multiprocessing.Value('i', 0)
+active = multiprocessing.Value('i', 0)
+t0 = time.time()
+maxDebLev = gi('DEB', 0)
+mn = rn = gi('N', 0)
+rp0 = gi('RP0', 11000)
+rn = gi('RN', 3)
+mp0 = gi('MP0', 12000)
+mn = gi('MN', 3)
+itext = ga('T', "bla bla")
+ittl = gi('TTL', 3)
+pace = float(os.environ["P"]) if "P" in os.environ else 0
+pacing = 1 if pace > 0 else 0
+random.seed(gi('RS', 0))
+connThreshold = 77
+conn_TO = 0.01
+sel_TO = 1
+issl = gi('SSL', 0)
+caPath = "/home/local/etc/ssl/certs/"
+sslPathSuff = "/../CS/"
+cePath = os.environ["CEP"] if "CEP" in os.environ else os.path.dirname(sys.argv[0]) + sslPathSuff
+active.value = mn + rn
+if issl > 1: active.value *= 2
+signal.signal(signal.SIGUSR2, signal.SIG_IGN)
+debug.log(1, "pgm={}, ttl={}, pace={}, seed={}, SSL mask={}, debug={}".format(sys.argv[0], ittl, pace, gi('RS', 0), issl, maxDebLev))
+if issl > 0: debug.log(3, "ssl path: {}, CA path: {}".format(cePath, caPath))
+
+if issl < 2: issl = (issl,)
+else: issl = (0, 1)
+if ittl > 0:
+ for ss in issl:
+ topo = ("mash", "ring")
+ p0 = (mp0, rp0)
+ n = (mn, rn)
+ for p in zip(topo, p0, n):
+ if not os.fork():
+ Constellation(ss, *p).run(ss, *p)
+ while 1:
+ try: os.wait()
+ except: break
+debug.log(1, "final balance: forwards={}, connections={}".format(forwards.value, connects.value))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cs Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,75 @@
+#!/bin/bash
+
+DEBID="client/server DEMO starting script"
+D=$(dirname $0)
+
+[[ $(basename $0) == cs ]] && {
+ echo $DEBID:
+ echo "Syntax: <env> $(dirname $0)/{csa,csc,cscpp,csj,csp}"
+ echo "env:"
+ echo -e "\tDEB\tdebug level 0-9; 7=msgs of level 7 only; 9=asm komponent sandbox"
+ echo -e "\tRS\trandom seed"
+ echo -e "\tP\tpacing, secs in float"
+ echo -e "\tT\ttext sent"
+ echo -e "\tMP0\tlistening port of the first node of MASH constellation"
+ echo -e "\tPP0\tlistening port of the first node of RING constellation"
+ echo -e "\tMN\tnum. of MASH nodes"
+ echo -e "\tRN\tnum. of RING nodes"
+ echo -e "\tN\tnum. of MASH and RING nodes; overchanged by MN/RN"
+ echo -e "\tTTL"
+ echo -e "\tSSL\t0=no SSL, 1=SSL, 2=both"
+ echo -e "\tCAP\tpath to SSL CA certs"
+ echo -e "\tCEP\tpath to local SSL stuff (keys and certificates)"
+ exit 1
+ }
+
+# explicit values for this run
+_MP0=$MP0
+_RP0=$RP0
+_MN=$MN
+_RN=$RN
+
+[[ $_MP0 ]] && MP0=$_MP0
+[[ $_RP0 ]] && RP0=$_RP0
+
+[[ $_MN ]] && MN=$_MN
+[[ $_RN ]] && RN=$_RN
+
+[[ $N ]] && { MN=$N; RN=$N; }
+
+[[ -z $CEP ]] && CEP=$D/CS/ # SSL certs path
+[[ -z $CAP ]] && CAP="/home/local/etc/ssl/certs/" # CA certs ppath
+[[ -z $RS ]] && RS=$RANDOM
+
+p=
+[[ $DEB ]] && p+="DEB=$DEB"
+[[ $RN ]] && p+=" RN=$RN"
+[[ $RP0 ]] && p+=" RP0=$RP0"
+[[ $MN ]] && p+=" MN=$MN"
+[[ $MP0 ]] && p+=" MP0=$MP0"
+[[ $N ]] && p+=" N=$N"
+[[ $T ]] && p+=' T="$T"'
+[[ $SSL ]] && p+=" SSL=$SSL"
+[[ $TTL ]] && p+=" TTL=$TTL"
+[[ $FAKE ]] && p+=" FAKE=$FAKE"
+[[ $STO ]] && p+=" STO=$STO"
+[[ $CAP ]] && p+=" CAP=$CAP"
+[[ $CEP ]] && p+=" CEP=$CEP"
+[[ $P ]] && p+=" P=$P"
+[[ $RS ]] && p+=" RS=$RS"
+
+trap "echo $DEBID: signal USR1 caught" USR1
+trap "" USR2
+trap "echo $DEBID: ABEND" TERM
+case $(basename $0) in
+ csp ) eval $p $D/CSp/CS.py ;;
+# csj ) eval $p java -cp $D/CSj cs.CS ;;
+ csj ) eval $p java -cp $D/CSj CS ;;
+ cscpp ) eval $p $D/CScpp/CS ;;
+ csc ) eval $p $D/CSc/CS ;;
+ csa ) eval $p $D/CSa32/CS ;;
+ * ) ;;
+esac
+RC=$?
+[[ $RS && $DEB -gt 0 ]] && echo RS: $RS, RC: $RC
+exit $RC
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/csa Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/csc Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cscpp Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/csj Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/csp Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/csr Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/cst Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1 @@
+cs
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/meter Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,59 @@
+#/bin/bash
+
+trap "" USR2 # USR2 is used to synchronize subprocesses
+
+home=$(dirname $0)
+
+save() {
+ cat >$fn <<EOD
+RP0=$1
+TS=$2
+EOD
+}
+
+exi() {
+ echo TRAP
+ eval $(posun)
+ save $((port+n)) $(date +%s)
+ exit 1
+}
+
+posun() {
+ ((n+=RN))
+ ((n>skok)) && { ((n+=500)); ((skok+=1000)); }
+ echo n=$n
+ echo skok=$skok
+}
+
+trap exi 2
+fn=$(basename $0).port
+[[ -e $fn ]] || save 11000 $(date +%s)
+. $fn
+D=$(date +%s)-$TS
+((RP0>29999)) || ((D>60)) && { RP0=11000; TS=$(date +%s); save $RP0 $TS; }
+declare -a langs
+langs=(Asm C C++ Java Python)
+declare -A cmds times
+cmds=([Asm]="csa" [C]="csc" ["C++"]="cscpp" [Java]="csj" [Python]="csp")
+
+[[ $RUNS ]] || export RUNS=3
+[[ $DEB ]] || export DEB=0
+[[ $MN ]] || export MN=0
+[[ $RN ]] || export RN=33 # pro python radši ne víc než 50
+[[ $TTL ]] || export TTL=3333
+[[ $SSL ]] || export SSL=0
+echo -e DEB=$DEB\\tMN=$MN\\tRN=$RN\\tTTL=$TTL\\tSSL=$SSL
+
+n=0; skok=500; port=$RP0
+for j in $(seq $RUNS)
+ do
+ for k in "${langs[@]}"
+ do
+ export RP0=$((port+n))
+ times[$k]=$(/usr/bin/time -o /dev/stdout -f %U $home/${cmds[$k]})
+ printf "%-12s%4.2f\n" $k ${times[$k]}
+ eval $(posun)
+ done
+ echo
+done
+save $((port+n)) $(date +%s)