--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/CSa32/Node.S Thu Nov 21 14:55:10 2019 +0100
@@ -0,0 +1,1616 @@
+ .include "DS.S"
+ .data
+ .text
+#-----------------------------------------------
+# N O D E O P E R A T I O N S
+#-----------------------------------------------
+ ARGS
+ DS CnstlnP # -->Constellation inst
+ DS port # node's port#
+# returns
+# nothing
+# local vars
+ PROLOC
+ DL this, NodeL # this Node instance
+ DL thisP # -->this Node
+ DL deP # -->Debug inst for LOG macro
+ DL rc
+ DL tosignal # switch
+ DL sigact, SigActionL
+ DL sigmask, 128
+ DL sigret
+ EPILOC
+#-----------------------------------------------
+ .global Node
+Node:
+ PROLOG
+ lea this(bp), b # -->this Node
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save -->Debug locally for LOG macro
+# save parm values
+ mov CnstlnP(bp), c # -->Constellation
+ mov c, N.cnstlnP(b) # save -->Constellation
+ mov port(bp), a
+ mov a, N.locPort(b) # save port#
+# set Debug id
+ push a
+ cmpl $Cn.ring, Cn.topo(c)
+ je 0f
+ push $8f
+ jmp 1f
+0: push $7f
+1: cmp $0,Cn.ssl(c)
+ je 2f
+ push $6f
+ jmp 9f
+2: push $5f
+ jmp 9f
+5: .asciz "non"
+6: .asciz ""
+7: .asciz "RING"
+8: .asciz "MASH"
+9: DEBID "%sSSL %s node %d", 3 # N->ssl ? "" : "non", N.topo==mash ? "MASH" : "RING", locPort
+ call N.init # initialize node
+ call N.bind # bind node to local port
+ push N.kicker(b) # kicker indicator
+ LOG 5, "kicker=%d"
+ cmpl $0, N.kicker(b) # kicker ?
+ je 0f # no
+ call N.kickOff # start transfer
+0:
+ call N.mainLoop # iterate on socket I/O select
+ LOG 5, "closing ssc"
+ push N.ssc(b)
+ SYS close
+ movl $0, rc(bp) # rc = OK
+# wait on thread closing client side
+ cmpl $0, N.closing(b) # closing client side ?
+ je N.chkData # no
+ push $0
+ push N.ptid(b) # thread id
+ call pthread_join
+ test a, a
+ jz 1f
+ push a # unique error handlig for pthread_join
+ call strerror
+ push a
+ LOG 0, "pthread_join: %s"
+ push deP(bp)
+ call C.abend
+1:
+ mov thisP(bp), b # -->this Node
+ push N.pid(b)
+ LOG 5, "task %u: closing thread returned"
+# synchronize with all nodes in all constellations and then check received data
+N.chkData:
+ call N.sync # wait for others
+ cmpl $0, N.kicker(b) # kicker ?
+ je 0f # no
+
+ push N.dataP(b) # -->Data
+ call Da.chk
+ test a, a # data check OK ?
+ jnz 0f # yes
+ movl $1, rc(bp) # rc = BAD
+ LOG 0, "data on INPUT TO and OUTPUT FROM constellation DIFFER"
+0:
+ push N.pid(b)
+ push rc(bp)
+ LOG 2, "end of operations, rc=%d, process=%u"
+ call exit
+#-----------------------------------------------
+# C O N S T R U C T O R
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+# DL CnstlnP # -->Constellation
+ DL len # various lengths
+ strL = 128
+ DL str, strL # string buf
+ EPILOC
+#-----------------------------------------------
+N.init: # c: -->Cnstln
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "initializing..."
+# initialize node's attributes
+ call getpid
+ mov a, N.pid(b)
+ mov N.cnstlnP(b), c # -->Constellation
+ push deP(bp) # -->caller Debug id
+ lea N.data(b), a
+ mov a, N.dataP(b)
+ push a # -->Data
+ call Data # initialize data item
+ mov Cn.topo(c), a
+ mov a, N.topo(b) # save topology
+ mov Cn.ssl(c), a
+ mov a, N.ssl(b) # SSL switch
+ mov Cn.first(c), a
+ mov a, N.first(b) # save port# of first node in constellation
+ movl $0, N.kicker(b) # clear kicker switch
+ cmp a, N.locPort(b) # kicker ? (locPort == first)
+ sete N.kicker(b)
+1: add Cn.nodes(c), a
+ dec a
+ mov a, N.last(b) # save port# of last node (last = first + nodes -1)
+ mov Cn.nodes(c), a
+ mov a, N.nodes(b) # save num. of nodes in constellation
+ mov Cn.div(c), a
+ mov a, N.div(b) # save divisor for random choice (MAX_INT / nodes)
+ mov Cn.topo(c), a
+ mov a, N.topo(b)
+ mov Cn.forwP(c), a
+ mov a, N.forwP(b) # save -->shared "forward" indicator
+ movl $0, N.closing(b) # unset closing indicator
+# allocate and initialize server side and client side sockets
+ mov $SocketInfoL, a
+ mull N.nodes(b) # SocketInfo array len
+ mov a, N.sockArrLen(b) # save array len
+ push a
+ call malloc
+ cmp $0, a
+ jnl 0f
+ SYSERR "malloc"
+0: mov thisP(bp), b # -->this Node
+ mov a, N.srvSideP(b) # save -->array of server side sockets
+ pushl N.sockArrLen(b)
+ call malloc
+ cmp $0, a
+ jnl 0f
+ SYSERR "malloc"
+0: mov thisP(bp), b # -->this Node
+ mov a, N.cliSideP(b) # save -->array of client side sockets
+# initialize allocated socket infos
+ xor c, c # offset
+0: mov N.srvSideP(b), a # -->server side SocketInfo array
+# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
+ movl $-1, So.sc(a, c) # indicate server side socket is not in use
+ mov N.cliSideP(b), a # -->client side SocketInfo array
+# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
+ movl $-1, So.sc(a, c) # indicate client side socket is not in use
+ lea SocketInfoL(c), c # incr by SocketInfo len
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate
+
+ cmpl $0, N.ssl(b) # node's ssl switch
+ jz N.initR # no SSL
+# prepare SSL context
+# call SSL_load_error_strings
+# call SSL_library_init
+ call OPENSSL_init_ssl
+ LOG 4, "setting SSL contex...", 0
+ call TLS_method
+ push a
+ call SSL_CTX_new
+ mov thisP(bp), b # -->this Node
+ mov a, N.ctxP(b) # save -->new SSL context
+ test a, a
+ jnz 0f
+ SSLERR "new SSL CTX"
+# set SSL mode
+0: pushl $0
+ pushl $4 # SSL_MODE_AUTO_RETRY
+ pushl $33 # SSL_CTRL_MODE
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_ctrl
+ movl $0, 8(sp) # no callback
+ movl $2, 4(sp) # mode = SSL_VERIFY_FAIL_IF_NO_PEER_CERT
+ call SSL_CTX_set_verify # set peer certificate verification parameters
+# set X509 key file name
+ mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local TCP port#
+ mov C.csP, c
+ pushl C.cePathP(c) # -->name of SSL path
+ pushl $0f # -->format
+ pushl $strL # buf len
+ lea str(bp), a
+ push a # -->string buf
+ call snprintf
+ jmp 1f
+0: .asciz "%skeys/%u.key"
+1:
+ pushl $1 # SSL_FILETYPE_PEM
+ lea str(bp), a # filename string
+ push a
+ LOG 5, "SSL private key used: %s", 1
+ mov thisP(bp), b # -->this Node inst
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_use_PrivateKey_file
+ cmp $1, a
+ je 0f
+ SSLERR "hh's key file"
+# set X509 cert file name
+0: mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local TCP port #
+ mov C.csP, c
+ pushl C.cePathP(c) # -->name of SSL path
+ pushl $0f
+ pushl $strL # buf len
+ lea str(bp), a # string buf
+ push a
+ call snprintf
+ jmp 1f
+0: .asciz "%scerts/%u.pem"
+1: mov thisP(bp), b # -->this Node
+ LOG 5, "SSL certificate used: %s", 1
+ pushl $1 # SSL_FILETYPE_PEM
+ lea str(bp), a # filename string
+ push a
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_use_certificate_file
+ cmp $1, a
+ je 0f
+ SSLERR "hh's cert file"
+# set path to CA
+0: mov thisP(bp), b # -->this Node
+ mov C.csP, c
+ pushl C.caPathP(c) # -->name of SSL path
+ pushl $0f # -->CApath
+ LOG 5, "SSL: CA path: %s", 1
+ pushl $0 # -->CAfile not used
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_CTX_load_verify_locations # set default locations for trusted CA certificates
+ cmp $1, a
+ je N.initR
+ SSLERR "hh's thrusted certs path"
+0: .asciz "/home/local/etc/ssl/certs/"
+N.initR:
+ LOG 5, "initalized"
+ EPILOG
+#-----------------------------------------------
+# K I C K O F F
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ DL sci # socket info rank
+ DL digest, 24
+ EPILOC
+N.kickOff:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# load payload data
+ movl C.csP, c # -->CS
+ push C.txtP(c) # -->initial payload text
+ push C.ttl(c) # initial TTL
+# lea N.data(b), a # -->Data
+# push a
+ push N.dataP(b) # -->Data
+ call Da.load # load data container
+ call N.nextNode
+ mov a, N.next(b) # save next node#
+ sub N.first(b), a # first - next = socketinfo rank
+ mov a, sci(bp) # save socket info rank
+ lea digest(bp), a # -->digest buf
+ push a
+ push N.dataP(b) # -->Data
+ call Da.digest24
+ call Da.getDataLen
+ push N.next(b) # next node#
+ push a # data len
+ lea digest(bp), a # -->digest
+ push a
+ LOG 2, "kicker: ready to initial send %s, len=%u to node %u"
+ push N.next(b)
+ push sci(bp)
+ call N.conn # connect to next node
+ push sci(bp)
+ call N.put # send data to next node
+ EPILOG
+#-----------------------------------------------
+# M A I N L O O P : I T E R A T E O N S O C K E T I / O
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ EPILOC
+N.mainLoop:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# prepare
+ LOG 5, "preparing for I/O select..."
+ call N.clearSocketMasks # clear socket masks for select and zero nfds
+ call N.maskSsc # mask ssc for select
+# iterate while there are some sockets masked for select (nfds > 0)
+0: cmp $0, N.nfds(b) # any I/O in progress ?
+ jz 0f # no, end operations of node
+ call N.selectSocketIo # select on server side sockets and forward data
+ call N.clearSocketMasks # clear socket masks for select and zero nfds
+ call N.maskSsc # mask ssc for select
+ call N.maskSrvSockets # mask all connected server side sockets for select
+ jmp 0b
+0: EPILOG
+#-----------------------------------------------
+# S E L E C T S O C K E T I / O
+#-----------------------------------------------
+# select on masked sockets
+# upon return from select
+# accept connections to ssc
+# forward data from posted sockets
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ EPILOC
+N.selectSocketIo:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ push N.nfds(b)
+ lea N.rs(b), a
+ push (a)
+ push 4(a)
+ push N.ssc(b)
+ LOG 5, "select ssc=%u, mask=%08x %08x, nfds=%u"
+ lea N.t(b), a # -->timeval
+ movl $1, Ti.secs(a)
+ movl $0, Ti.usecs(a)
+ push a
+ lea N.es(b), a
+ push a # -->es (exceptional mask is not used yet)
+ pushl $0
+ lea N.rs(b), a # -->rs
+ push a
+ push N.nfds(b)
+ SYS select
+ lea N.rs(b), a
+ push (a)
+ push 4(a)
+ LOG 5, "return from select, mask of posted=%08x %08x"
+# check ssc for incomming connect request, accept it and forward data
+ lea N.rs(b), a # -->rs select mask
+ push a
+ pushl N.ssc(b)
+ call fd_isset
+ test a, a # ssc I/O ?
+ jz N.checkAndForw # no, check other sockets
+# prepare for accept; find free srv socket info block for accept
+ mov N.srvSideP(b), a # -->srv socket info array
+ xor c, c # offet in socket info array
+ xor d, d # socket info rank
+0: cmp $-1, So.sc(a, c) # socket allocated ?
+ je 0f # no, free socket found
+ inc d
+ lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on sockets
+ ERR "can't accept, all sockets in use"
+# free socket found, accept connection on it and forward data
+0: push d
+ LOG 4, "slot for accept=%d"
+ call N.acc
+ call N.forw
+# check all server side sockets for pending I/O and call forward on them
+N.checkAndForw:
+ xor c, c # offet into socket info array
+ lea N.rs(b), a # -->rs select mask
+ push a
+ lea -4(sp), sp # adjust stacker for iteration
+ xor c, c # zero offset
+ xor d, d # set counter
+# iterate through server side sockets and forward from posted sockets
+0: mov N.srvSideP(b), a # -->srv socket info array
+ cmp $-1, So.sc(a, c) # socket connected ?
+ je 1f # no, iterate
+ mov So.sc(a, c), a
+ mov a, (sp) # stack socket#
+# LOG 0, "checking port %u"
+ call fd_isset # socket I/O ?
+ test a, a
+ jz 1f # no, iterate
+ mov N.srvSideP(b), a
+ mov So.sc(a, c), a
+ mov d, (sp) # stack socket rank
+# LOG 0, "port posted, rank=%u"
+ call N.forw # forward data
+1: inc d
+ lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on srv side sockets
+ EPILOG
+#-----------------------------------------------
+# M A S K C O N N E C T E D S O C K E T S F O R S E L E C T
+#-----------------------------------------------
+# mask all connected server side socketS for next select
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSrvSockets:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ lea N.rs(b), a # -->rs select mask
+ push a
+ lea -4(sp), sp # adjust stacker for iteration
+ xor c, c # zero offet
+0: mov N.srvSideP(b), a # -->srv socket info array
+ cmp $-1, So.sc(a, c) # socket connected ?
+ je 1f # no, iterate
+ mov So.sc(a, c), a
+ mov a, (sp) # stack socket#
+ call N.maskSocket # mask socket for select
+1: lea SocketInfoL(c), c # inc offset into array
+ cmp c, N.sockArrLen(b)
+ ja 0b # iterate on srv side sockets
+ EPILOG
+#-----------------------------------------------
+# M A S K S S C F O R S E L E C T
+#-----------------------------------------------
+# mask ssc for select until forward is disabled
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSsc:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ mov N.forwP(b), a
+ cmp $1, (a) # forwarding enabled ?
+ jne 0f # no, return
+ push N.ssc(b)
+ call N.maskSocket
+0: EPILOG
+#-----------------------------------------------
+# C L E A R S O C K E T M A S K S
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.clearSocketMasks:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ movl $0, N.nfds(b) # zero nfds
+ lea N.rs(b), a # zero rs select mask
+ push a
+ call fd_zero
+ lea N.es(b), a # zero es select masks
+ push a
+ call fd_zero
+ EPILOG
+#-----------------------------------------------
+# M A S K S O C K E T F O R S E L E C T
+#-----------------------------------------------
+ ARGS
+ DS sc # socket#
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.maskSocket:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ lea N.rs(b), a # -->rs select mask
+ push a
+ pushl sc(bp)
+ call fd_set
+ mov sc(bp), a # socket#
+ cmp a, N.nfds(b) # nfds > sc ?
+ jg 0f # yes
+ inc a
+ mov a, N.nfds(b) # nfds = sc + 1
+0: EPILOG
+#-----------------------------------------------
+# F D S E T O P E R A T I N O S
+#-----------------------------------------------
+ PROLOC # local data frame def. is used by 4 following subrs
+ DL thisP # -->this Node
+ dL deP
+ EPILOC
+fd_zero:
+# zero FD mask
+ ARGS
+ DS fdsetP # -->FD mask
+ PROLOG
+ mov fdsetP(bp), a
+ xor c, c
+0: movl $0, (a, c, 4)
+ inc c
+ cmp $32, c # size of FD mask is 32 int (1024 bits)
+ jl 0b
+ EPILOG
+fd_set:
+# mask socket in FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ or d, (c, a) # set sc mask
+ EPILOG
+fd_clear:
+# clear socket from FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ not d
+ and d, (c, a) # clear sc mask
+ EPILOG
+fd_isset:
+# test socket bit status in FD mask
+ ARGS
+ DS sc
+ DS fdsetP # -->FD mask
+ PROLOG
+ movl sc(bp), a # sc
+ call fd_mask # a: integer offset, d: "1" bit in position according to sc
+ mov fdsetP(bp), c # -->select mask
+ and (c, a), d # sc selected ?
+ mov d, a
+# mov $0, a
+# jz 0f # not selected
+# mov $1, a
+0: EPILOG_R
+fd_mask: # a: sc
+# adjust offsets into FD mask
+ xor d, d
+ movl $32, c
+ divl c # a: mask integer rank, c: bit offset
+ shll $2, a # a: mask integer offset
+ mov d, c
+ mov $1, d
+ shl %cl, d # d: "1" bit in position according to sc
+ ret # a: offset of mask integer
+#-----------------------------------------------
+# F O R W A R D D A T A T O O T H E R N O D E
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL digest, 24
+ EPILOC
+#-----------------------------------------------
+N.forw:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ push sci(bp) # socket info rank
+ call N.get
+ test a, a # data read ?
+ jz N.forwCloseSock # no, EOF, close this socket and start closing clients
+# kicker checks TTL
+ cmp $1, N.kicker(b) # kicker ?
+ jne 1f # no, continue forwarding
+
+ lea -4(sp), sp # adjust stacker
+ lea digest(bp), a
+ push a # -->digest buf
+ push N.dataP(b) # -->Data
+ call Da.ttl
+ mov a, 8(sp) # TTL from data header
+ call Da.digest24
+ mov a, 4(sp) # -->text digest
+ call Da.getPort
+ mov a, (sp) # remote listen port from data header
+ LOG 5, "received from node %u: %s, ttl=%d"
+ push N.dataP(b) # -->Data
+ call Da.dttl # decrement TTL in data
+ test a, a # TTl > 0 ?
+ jz N.forwKickerStop # no, disable forwarding in constellation
+# forward data
+1: call N.nextNode # calculate next node
+ mov a, N.next(b)
+ push a
+ LOG 4, "next node %u"
+ sub N.first(b), a # socket info rank (next - first)
+ mov a, sci(bp) # save
+# connect if not connected yet
+ movl $SocketInfoL, d
+ mul d # a: offset into socket info array
+ mov N.cliSideP(b), c # -->client side socket info array
+ cmp $-1, So.sc(c, a) # connected ?
+ jne 0f # yes
+ push N.next(b)
+ push sci(bp)
+ call N.conn
+0:
+ lea -4(sp), sp
+ push N.dataP(b) # -->Data
+ call Da.ttl
+ mov a, 4(sp) # ttl
+ call Da.getDataLen
+ mov a, (sp) # container len
+ push N.next(b) # next node#
+ LOG 5, "forwarding to %d, len=%d, ttl=%d --->"
+# pacing
+ mov C.csP, c # -->CS
+ cmp $0, C.pacing(c) # pacing active ?
+ je 0f # no
+ pushl $0
+ lea C.pace.tv_sec(c), a
+ push a
+ SYS nanosleep # pace
+0: push sci(bp)
+ call N.put
+ LOG 4, "leaving forward"
+ jmp N.forwR # exit forwarding
+N.forwKickerStop: # ttl = 0
+ lea digest(bp), a
+ push a # -->digest buf
+ push N.dataP(b) # -->Data
+ call Da.digest24
+ push a
+ LOG 1, "received after finally passing all: %s", 1
+ mov N.forwP(b), a # -->shared forw indicator
+ movl $0, (a) # disable forwarding for all nodes in constellation
+ jmp N.forwCloseCli
+N.forwCloseSock:
+ pushl deP(bp)
+ pushl $1 # indicate "server side"
+ push sci(bp) # socket info rank
+ call N.closeSocket # close this server side socket
+N.forwCloseCli:
+ call N.closeClients # launch client side closing thread
+ LOG 4, "leaving forward, closing"
+N.forwR:
+ EPILOG
+#-----------------------------------------------
+# D E T E R M I N E N E X T N O D E
+#-----------------------------------------------
+ PROLOC
+ DL thisP # -->this Node
+ EPILOC
+N.nextNode:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+
+ cmp $Cn.ring, N.topo(b) # ring ?
+ jne 0f # no
+# ring
+ mov N.locPort(b), a
+ inc a # next = locPort + 1
+ cmp a, N.last(b) # next > last ?
+ jnl N.nextNodeR
+ mov N.first(b), a
+ jmp N.nextNodeR
+# mash
+0: call random
+ xor d, d
+ mov thisP(bp), b
+ divl N.div(b) # 0 <= random < nodes
+ add N.first(b), a # first + random
+ cmp a, N.locPort(b)
+ je 0b # iterate until other then local node is selected
+N.nextNodeR:
+ push a
+ EPILOG_R
+
+#-----------------------------------------------
+# B I N D T O L O C A L T C P P O R T
+#-----------------------------------------------
+# returns
+# nothing
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug
+ DL i
+ DL aiP # -->IP addrinfo
+ strL = 64
+ DL str, strL # string buf
+ EPILOC
+#-----------------------------------------------
+ .global N.bind
+N.bind:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "binding...", 0
+# prepare IP addr
+ pushl $AddrInfoL
+ call malloc # alloc addrinfo for hints
+ mov a, aiP(bp)
+ pushl $AddrInfoL
+ pushl $0
+ push a
+ call memset # clear hints
+
+ movl $2, ai_family(a) # AF_INET
+ movl $1, ai_flags(a) # AI_PASSIVE for bind
+ movl $0, ai_protocol(a) # any protocol
+ movl $1, ai_socktype(a) # SOCK_STREAM, blocking type
+
+ mov thisP(bp), b # -->this Node
+ pushl N.locPort(b) # local port #
+ push $0f
+ lea str(bp), a
+ push a # -->str buff
+ call sprintf
+ jmp 1f
+0: .ascii "%d\0"
+1:
+ lea aiP(bp), a
+ push a # -->-->addrinfo for new addrinfo
+ pushl aiP(bp) # -->addrinfo hints
+ lea str(bp), a
+ push a # -->loc port # string
+ pushl $0f
+ call getaddrinfo
+ jmp 1f
+0: .ascii "localhost\0"
+# check IP addr
+1: cmp $0, a # getaddrinfo OK ?
+ jz 0f # yes
+ push a
+ call gai_strerror
+ push a
+ LOG 0, "getaddrinfo error: %s, ABEND", 1
+ push $1
+ call exit
+# log assigned IP addr
+0: pushl deP(bp)
+ mov aiP(bp), a # -->addrinfo
+ pushl ai_addrP(a) # -->sockaddr
+ lea str(bp), a
+ push a # -->string buf
+ call gai
+
+ lea str(bp), a
+ push a
+ LOG 5, "getaddrinfo OK, %s", 1
+# allocate socket
+ mov aiP(bp), a
+ pushl ai_protocol(a)
+ pushl ai_socktype(a)
+ pushl ai_family(a)
+ SYS socket
+ mov a, N.ssc(b) # save ssc
+# set socket option SO_REUSEADDR
+ pushl $4 # integer width
+ lea 9f, a # value 1
+ push a
+ pushl $2 # SO_REUSEADDR
+ pushl $1 # SOL_SOCKET
+ push N.ssc(b) # ssc
+ SYS setsockopt
+ jmp 8f
+9: .long 1
+8:
+# bind
+ mov aiP(bp), a
+ pushl ai_addrlen(a)
+ pushl ai_addrP(a)
+ push N.ssc(b) # ssc
+ SYS bind
+# listen
+ pushl $1 # pend conns queue len
+ pushl N.ssc(b) # ssc
+ SYS listen
+ pushl N.locPort(b)
+ LOG 2, "bound to %d", 1
+N.bindR:
+ EPILOG
+#-----------------------------------------------
+# C O N N E C T T O R E M O T E N O D E
+#-----------------------------------------------
+# args
+ ac = 40
+ DS i # socket rank
+ DS remPort # TCP port# on remote site
+# returns
+# nothing
+# local vars
+ ac = 0
+ DL debug, DebugL # Debug inst
+ DL deP # -->Debug inst
+ DL retry # retry counter
+ DL ai, AddrInfoL # IP addrinfo
+ DL aiP # -->IP addrinfo
+ DL str, 64 # string buf
+ DL currSockP # -->SocketInfo save area
+ locL = ac
+#-----------------------------------------------
+ .global N.conn
+N.conn:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea debug(bp), a # -->own local debug block
+ mov a, deP(bp) # save -->local Debug
+ pushl remPort(bp) # remote port
+ lea N.debug(b), d # -->node's Debug
+ lea D.id(d), a # -->node's debug ID
+ push a
+ DEBID "%s to %u", 2 # set local debug ID
+ LOG 3, "connecting...", 0
+
+ mov N.cliSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull i(bp) # SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+ mov remPort(bp), a
+ mov a, So.remPort(c) # put remote port # into curr SocketInfo
+# prepare IP addrinfo hints
+ pushl $AddrInfoL # IP addrinfo length
+ pushl $0
+ lea ai(bp), a
+ mov a, aiP(bp) # -->IP addrinfo
+ push a
+ call memset # clear addrinfo
+ movl $2, ai_family(a) # AF_INET
+ movl $0, ai_flags(a)
+ movl $0, ai_protocol(a) # any protocol
+ movl $1, ai_socktype(a) # SOCK_STREAM
+# set up port # string
+ mov currSockP(bp), a # -->curr socket info
+ pushl So.remPort(a) # remote port#
+ pushl $0f
+ lea str(bp), a
+ push a # -->port# string
+ call sprintf
+ jmp 1f
+0: .ascii "%u\0"
+# get IP addrinfo block chain
+1: lea aiP(bp), a # -->-->IP addrinfo
+ push a
+ pushl aiP(bp) # -->IP addrinfo hints
+ lea str(bp), a # -->remote port # string
+ push a
+ pushl $0f
+ call getaddrinfo
+ jmp 1f
+0: .ascii "localhost\0"
+# check IP addr
+1: cmp $0, a # getaddrinfo OK ?
+ jz 0f # yes
+ push a
+ call gai_strerror
+ push a
+ LOG 0, "getaddrinfo error: %s, ABEND", 1
+ push $1
+ call exit # ABEND
+# log assigned IP addr
+0: pushl deP(bp) # -->Debug inst
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_addrP(a) # -->sockaddr
+ lea str(bp), a
+ push a # -->string buf
+ call gai
+ LOG 5, "getaddrinfo OK, %s", 1
+# allocate comm socket
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_protocol(a)
+ pushl ai_socktype(a)
+ pushl ai_family(a)
+# call socket # allocate comm socket
+# cmp $0, a
+# jg 0f
+# SYSERR "socket alloc"
+#0:
+ SYS socket
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sc(c) # put new socket # into SocketInfo
+# connect to remote partner server side
+ mov C.csP, a
+ mov C.connTh(a), a # conn retry threshold
+ mov a, retry(bp) # initilize retry counter
+ mov aiP(bp), a # -->IP addrinfo
+ pushl ai_addrlen(a)
+ pushl ai_addrP(a)
+ pushl So.sc(c) # socket #
+2: call connect
+ cmp $0, a
+ jnl 0f # connected
+ call __errno_location # get --> sys errno
+ cmp $111, (a) # ECONNREFUSED ?
+ je 1f # yes, wait to retry
+ SYSERR "connect" # no, ABEND
+1: pushl $22000 # 22 msecs
+ call usleep
+ lea 4(sp), sp
+ sub $1, retry(bp)
+ jnz 2b # iterate
+# conn retry threshold reached
+ mov C.csP, a
+ pushl C.connTh(a)
+ LOG 0, "connection refused, threshold %d reached", 1
+ pushl $1
+ call exit # ABEND
+0:
+# connected, account new connection
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ lea S.counter_sem(a), a # -->overall counter semaphore
+ push a
+ SYS sem_wait
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ incl S.conns(a) # incr conns counter
+ lea S.counter_sem(a), a # -->overall msg counter semaphore
+ push a
+ SYS sem_post
+# prepare SSL
+ cmp $1, N.ssl(b) # node's ssl switch
+ jne N.connRet # no SSL
+
+ LOG 5, "prepare for SSL"
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_new # create a SSL structure
+
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sslP(c) # put -->SSL struct into SocketInfo
+ test a, a
+ jnz 0f
+ SSLERR "new SSL"
+# connect on SSL
+0: call ERR_clear_error
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_set_fd # connect the SSL object with a file descriptor
+ test a, a
+ jnz 0f
+ SSLERR "client SSL set fd"
+0:
+ LOG 5, "SSL fd set"
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_connect
+ test a, a # connected ?
+ jns 1f # yes
+ push a
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_get_error # SSL_get_error(-->ssl, err)
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 0f # no, other SSL err
+ SYSERR "SSL connect"
+0: SSLERR "SSL connect"
+1:
+ mov C.csP, a
+ mov C.connTh(a), a
+ sub retry(bp), a # # of retries = conn threshold - counter
+ push a
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket#
+ LOG 2, "connected via sc=%d after %d retries", 2
+N.connRet:
+ EPILOG
+#-----------------------------------------------
+# A C C E P T C O N N E C T I O N
+#-----------------------------------------------
+ ARGS
+ DS i # socket rank
+# returns
+# nothing
+# local vars
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL sa, SockAddrL # sockaddr
+ DL str, 64 # string buf
+ DL currSockP # -->curr SocketInfo save area
+ EPILOC
+#-----------------------------------------------
+ .global N.acc
+N.acc:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 4, "accepting..."
+
+ mov N.srvSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull i(bp) # curr SocketInfo offset
+ lea (c, a), a # -->curr SocketInfo
+ mov a, currSockP(bp) # save -->curr SocketInfo
+# accept
+ pushl $0
+ pushl $0
+# mov thisP, b # -->Node inst
+ pushl N.ssc(b) # listen socket
+# call accept
+# cmp $0, a
+# jnl 0f
+# SYSERR "accept"
+#0:
+ SYS accept
+ mov currSockP(bp), c # -->Node inst
+ mov a, So.sc(c) # save comm socket of accepted connection
+ pushl $0f # -->sa length
+ lea sa(bp), a # -->sockaddr
+ push a
+ pushl So.sc(c) # comm socket
+ call getpeername
+ jmp 1f
+0: .int SockAddrL
+1: pushl deP(bp)
+ lea sa(bp), a # -->sockaddr
+ push a
+ lea str(bp), a # -->string buf
+ push a
+ call gai # get string of addr/port
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # comm socket
+ LOG 3, "peer: sc=%d, %s", 2
+# prepare SSL
+ mov thisP(bp), b # -->this Node
+ cmp $1, N.ssl(b) # node's ssl switch
+ jne 1f # no SSL
+
+ pushl N.ctxP(b) # -->SSL context
+ call SSL_new # create a SSL structure
+ mov currSockP(bp), c # -->curr SocketInfo
+ mov a, So.sslP(c) # put -->SSL struct into SocketInfo
+ test a, a
+ jnz 0f
+ SSLERR "new SSL"
+# accept on SSL
+0: call ERR_clear_error
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket #
+ pushl So.sslP(c) # -->SSL struct
+ call SSL_set_fd # connect the SSL object with a file descriptor
+ test a, a
+ jnz 0f
+ SSLERR "client SSL set fd"
+0:
+ LOG 5, "SSL fd set"
+ call SSL_accept
+ test a, a
+ jns 1f
+ mov a, 4(sp)
+ call SSL_get_error # SSL_get_error(-->ssl, err)
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 0f # no, other SSL err
+ SYSERR "SSL accept"
+0: SSLERR "SSL accept"
+1:
+ LOG 2, "accepted"
+ EPILOG
+#-----------------------------------------------
+# R E A D F R O M C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns length read
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL shP # -->shared mem
+ DL currSockP # -->curr SocketInfo
+ DL n # len read
+ DL rest # len to be read
+ DL buf # read dest
+ EPILOC
+#-----------------------------------------------
+ .global N.read
+N.read:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+
+ mov C.csP, a # -->CS
+ mov C.shP(a), a # -->shared mem
+ mov a, shP(bp) # save -->shared mem
+ push N.dataP(b)
+ call Da.getDataLen
+ mov a, rest(bp) # init len to be read
+ call Da.getContP
+ mov a, buf(bp) # init read dest
+ mov N.srvSideP(b), c # -->server side SocketInfo array
+ mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+
+ push So.sc(c) # comm socket
+ push rest(bp) # len to be read
+ LOG 5, "ready to read len=%d from sc=%d..."
+# iterate on reading until whole length is read
+N.readIterate:
+ cmp $0, rest(bp) # something to be read ?
+ jng N.readAcc # no, end reading
+ push rest(bp) # len to be read
+ push buf(bp) # read dest
+ mov currSockP(bp), c # curr SocketInfo
+ cmp $1, N.ssl(b) # SSL ?
+ jne 0f # no
+# read w/ SSL
+ push So.sslP(c) # -->SSL struct
+ SYS SSL_read
+ jmp 1f
+# read w/o SSL
+0:
+ push So.sc(c) # socket #
+ SYS read
+# adjust read controls
+1:
+ jz N.readEof # EOF read
+ add a, buf(bp) # adjust -->read dest
+ sub a, rest(bp) # decrement rest to read
+ cmp $0, rest(bp) # everything read ?
+ jng 0f
+ push a
+ LOG 5, "partly read %d bytes"
+0: jmp N.readIterate
+# account data just read
+N.readAcc:
+ mov shP(bp), a # -->shared mem
+ lea S.counter_sem(a), a # overall msg counter semaphore
+ push a
+ SYS sem_wait # get semaphore
+ mov shP(bp), a
+ incl S.msgs(a) # incr msgs counter
+ lea S.counter_sem(a), a # overall msg counter semaphore
+ push a
+ SYS sem_post # post semaphore
+
+ lea -4(sp), sp # adjust stacker
+ push N.dataP(b) # -->Data
+ call Da.getPort
+ mov a, 4(sp) # remote listen port
+ call Da.getDataLen
+ mov a, (sp) # data length
+ LOG 5, "%d read from node %u"
+ mov (sp), a # return len read
+ jmp N.readRet
+# EOF read, finish
+N.readEof:
+ LOG 4, "EOF read"
+N.readRet:
+ EPILOG_R
+#-----------------------------------------------
+# W R I T E T O C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns length written
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL currSockP # -->curr SocketInfo
+ DL n # len written
+ DL rest # len to be written
+ DL buf # write orig
+ EPILOC
+#-----------------------------------------------
+ .global N.write
+N.write:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+# init control values
+ mov C.csP, a # -->CS
+ mov C.shP(a), a # -->shared mem
+ mov a, shP(bp) # save -->shared mem
+ push N.dataP(b) # -->Data
+ call Da.getDataLen
+ mov a, rest(bp) # init len to be written
+ call Da.getContP
+ mov a, buf(bp) # init write orig
+ mov N.cliSideP(b), c # -->SocketInfo array
+ mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c # -->curr SocketInfo
+ mov c, currSockP(bp) # save -->curr SocketInfo
+
+ push So.sc(c) # comm socket
+ push rest(bp) # len to be written
+ LOG 5, "ready to write len=%d to sc=%d..."
+# iterate on writing until whole length is written
+N.writeIterate:
+ cmp $0, rest(bp) # something to be written ?
+ jng N.writeEnd # no, end writing
+ push rest(bp) # len to be written
+ push buf(bp) # write orig
+ cmp $1, N.ssl(b) # SSL ?
+ jne 0f # no
+# SSL
+ pushl So.sslP(c) # -->SSL struct
+ SYS SSL_write
+ jmp 1f
+# no SSL
+0:
+ pushl So.sc(c) # socket
+ SYS write
+# adjust write controls
+1:
+ mov a, n(bp) # save len written
+ add a, buf(bp) # adjust write orig
+ sub a, rest(bp) # decrement rest to write
+ cmp $0, rest(bp) # everything written ?
+ jng 0f
+ push a
+ LOG 5, "partly written %d bytes"
+0: jmp N.writeIterate
+# return datalen
+N.writeEnd:
+ mov N.dataP(b), a
+ mov Da.datalen(a), a
+ push a
+ LOG 5, "%d written", 1
+ EPILOG_R
+#-----------------------------------------------
+# G E T D A T A F R O M C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns boolean: 0 - EOF read, 1 - data read
+ PROLOC
+ DL thisP # -->this Node
+ DL deP
+ EPILOC
+#-----------------------------------------------
+ .global N.get
+N.get:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ push sci(bp) # SocketInfo rank
+ LOG 5, "get data from socket, rank=%u"
+ call N.read
+ test a, a
+ jz 0f
+ mov $1, a
+0: EPILOG_R
+#-----------------------------------------------
+# P U T D A T A T O C O M M S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+# returns boolean: 0 - nothing written, 1 - data written
+ PROLOC
+ DL thisP # -->this Node
+ DL deP
+ EPILOC
+#-----------------------------------------------
+ .global N.put
+N.put:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ LOG 5, "put data to socket"
+
+ mov N.dataP(b), d # -->Data
+ mov Da.contP(d), a # -->container
+ lea Co.hdr(a), a # -->header
+ mov N.locPort(b), c
+ mov c, H.lport(a) # listen port to header
+
+ pushl sci(bp) # SocketInfo rank
+ call N.write
+ test a, a
+ jz 0f
+ mov $1, a
+0: EPILOG_R
+#-----------------------------------------------
+# C L O S E C O N N E C T I O N S O C K E T
+#-----------------------------------------------
+ ARGS
+ DS sci # SocketInfo rank
+ DS server # indicate server or client
+ DS deP # -->Debug inst
+# returns:# nothing
+ PROLOC
+ DL thisP # -->this Node
+ DL currSockP # -->curr SocketInfo
+ DL tagP
+ DL e
+ EPILOC
+N.srvSideTag: .asciz "server side"
+N.cliSideTag: .asciz "client side"
+#-----------------------------------------------
+ .global N.closeSocket
+N.closeSocket:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ mov N.srvSideP(b), c # -->SocketInfo array
+ movl $N.srvSideTag, tagP(bp)
+ cmp $1, server(bp) # server side ?
+ je 0f
+ mov N.cliSideP(b), c # client side
+ movl $N.cliSideTag, tagP(bp)
+0: mov $SocketInfoL, a
+ mull sci(bp) # curr SocketInfo offset
+ lea (c, a), c
+ mov c, currSockP(bp) # save -->curr SocketInfo
+ push So.sc(c) # socket to close
+ push tagP(bp)
+ LOG 5, "closing %s sc=%u...", 2
+
+ cmp $0, N.ssl(b) # SSL ?
+ je 2f # no
+# SSL shutdown
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_shutdown
+ cmp $0, a
+ jns 0f # no err
+ SSLERR "SSL shutdown (1)"
+0: jnz 2f # SSL shutdown finished
+ LOG 5, "SSL shutdown rc=0, retry"
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_shutdown
+ cmp $0, a
+ jg 2f # SSL shutdown finished
+ push a # SSL err
+ mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sslP(c) # -->SSL structure
+ call SSL_get_error
+ cmp $5, a # SSL_ERROR_SYSCALL ?
+ jne 1f # no, other SSL err
+# syscall err
+ call ERR_get_error
+ test a, a # SSL err ?
+ jnz 2f # yes, don't care
+ call __errno_location # sys err ?
+ test a, a
+ jz 2f # no
+ SYSERR "SSL shutdown (2)"
+# other SSL err, print SSL err queue
+1: SSLERR "SSL shutdown (2)"
+# close socket
+2: mov currSockP(bp), c # -->curr SocketInfo
+ pushl So.sc(c) # socket to close
+# call close
+# test a, a
+# jz 0f
+# SYSERR "close"
+#0:
+ SYS close
+ push tagP(bp)
+ LOG 4, "%s sc=%u closed", 2
+ mov currSockP(bp), c # -->curr SocketInfo
+ movl $-1, So.sc(c) # indicate closed socket
+ EPILOG
+#-----------------------------------------------
+# C R E A T E T H R E A D T O C L O S E C L I E N T S I D E S
+#-----------------------------------------------
+ ARGS
+ PROLOC
+ DL thisP # -->this Node
+ DL deP # -->Debug inst
+ DL pt # pthread_t (int)
+ EPILOC
+#-----------------------------------------------
+N.closeClients:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save local -->Debug
+ cmp $1, N.closing(b)
+ je 0f # already closing, nothing to do
+ LOG 5, "spawning thread to close client side"
+ push b # -->this (param for thread)
+ pushl $N.closeCliThread # -->thread func
+ pushl $0
+ lea N.ptid(b), a # -->thread ID
+ push a
+ call pthread_create
+ test a, a
+ jz 1f
+ SYSERR "create thread"
+1: mov thisP(bp), b # -->this Node
+ movl $1, N.closing(b) # indicate "closing"
+ push N.ptid(b)
+ LOG 5, "closing thread (%u) spawned"
+0: EPILOG
+#-----------------------------------------------
+# C L O S E C L I E N T S I D E S
+#-----------------------------------------------
+ ARGS
+ DS thisP
+ PROLOC
+ DL deP # -->Debug inst
+ DL debug, DebugL
+ EPILOC
+#-----------------------------------------------
+N.closeCliThread:
+ PROLOG
+ mov thisP(bp), b # -->this Node
+ lea debug(bp), a
+ mov a, deP(bp) # save deP
+ lea N.debug(b), a
+ lea D.id(a), a # -->caller debug id
+ push a
+ DEBID "%s CLOSE clients", 1
+ LOG 5, "start..."
+ mov $0, c # offet in socket info array
+ mov $0, d # socket info rank
+ mov N.cliSideP(b), a # -->client socket info array
+0: # iterate on opened sockets
+ cmp $-1, So.sc(a, c) # sc allocated ?
+ je 1f # no, iterate
+ push So.sc(a, c) # socket#
+ LOG 4, "sc=%u"
+ push deP(bp)
+ pushl $0 # indicate "client"
+ push d # socket info rank
+ call N.closeSocket # close socket
+1: inc d
+ cmp N.nodes(b), d # rank < nodes ?
+ je 0f # no, end
+ lea SocketInfoL(c), c # incr offset
+ jmp 0b # iterate
+0: LOG 4, "finished, exiting thread"
+ pushl $0
+ call pthread_exit
+ EPILOG
+#-----------------------------------------------
+# S Y N C H R O N I Z E A L L N O D E S
+#-----------------------------------------------
+ ARGS
+ PROLOC
+ DL thisP
+ DL deP
+ DL shP
+ DL tosignal
+ DL sigmask, 128
+ DL sigact, SigActionL
+ EPILOC
+N.sync:
+ PROLOG
+ mov b, thisP(bp) # save -->this Node
+ lea N.debug(b), a
+ mov a, deP(bp) # save -->Debug locally for LOG macro
+
+ movl $0, tosignal(bp)
+ mov C.csP, c # -->CS
+ mov C.shP(c), a # -->shared mem
+ push S.act(a)
+ mov a, shP(bp)
+ push N.pid(b)
+ LOG 5, "%u synchronizing: active=%u"
+# set signal handler block
+ lea sigact(bp), a
+ movl $N.sighandle, sa_handler(a)
+ movl $0, sa_flags(a)
+ lea sa_mask(a), a # set handler mask
+ push a
+ SYS sigfillset
+ jmp 0f
+N.sighandle:
+ ret
+0:
+# activate signal handler
+ lea sigmask(bp), a
+ push a
+ SYS sigemptyset
+ push $SIGUSR2
+ lea sigmask(bp), a
+ push a
+ SYS sigaddset
+ push $0
+ lea sigmask(bp), a
+ push a
+ push $SIG_UNBLOCK
+ SYS sigprocmask
+ push $0
+ lea sigact(bp), a
+ push a
+ push $SIGUSR2
+ SYS sigaction
+# update nodes counter
+ mov shP(bp), a
+ lea S.counter_sem(a), a
+ push a
+ SYS sem_wait # get semaphore
+ mov shP(bp), a
+ decl S.act(a) # decrement nodes counter
+ setz tosignal(bp)
+ mov shP(bp), a
+ lea S.counter_sem(a), a
+ push a
+ SYS sem_post # post semaphore
+ testl $1, tosignal(bp) # all finished?
+ jz N.sigwait
+# signal all tasks
+ push N.pid(b)
+ LOG 5, "%u signaling USR2"
+ push $SIGUSR2
+ push $0
+ SYS kill
+ jmp N.woken
+# wait for signal
+N.sigwait:
+# call pause
+ push $5
+ call sleep
+N.woken:
+ mov thisP(bp), b # -->this Node
+ push N.pid(b)
+ LOG 5, "%u woken up"
+ EPILOG
+#-----------------------------------------------
+# F O R M A T A N D P R I N T S S L E R R O R
+#-----------------------------------------------
+# args
+ ac = 8
+ DS deP # -->Debug
+ DS msgP # -->accompanying msg str
+# returns: nothing
+ PROLOC
+ DL str, 256 # generated string buf
+ EPILOC
+#-----------------------------------------------
+ .global N.sslErr
+N.sslErr:
+ enter $locL, $0
+0:
+ call ERR_get_error
+ push a
+ LOG 5, "ssl err, e=%lu", 1
+ test a, a # end of msg queue ?
+ jz 0f # yes
+ lea str(bp), d
+ push d # -->err msg str buf
+ push a # err
+ call ERR_error_string
+ push a # -->err msg str
+ pushl msgP(bp) # -->accompanying msg str
+ LOG 0, "%s: %s", 2
+ jmp 0b # iterate on SSL err msg queue
+0:
+ pushl $1
+ call exit # ABEND
+#-----------------------------------------------
+# G E T A S S I G N E D I P A D D R
+#-----------------------------------------------
+# puts assigned IP address:port in string
+ ARGS
+ DS sP # -->output string
+ DS saP # -->sockaddr
+ DS deP # -->Debug
+# returns: nothing
+ PROLOC
+ EPILOC
+#-----------------------------------------------
+gai: PROLOG
+
+ mov saP(bp), b # -->sockaddr
+ xor a, a
+ mov sa_data(b), %ax # port # in big endian order
+ push a
+ call ntohs
+ mov a, d
+
+ mov saP(bp), b # -->sockaddr
+ pushl sa_data+2(b) # IP addr in big endian order
+ call ntohl
+
+ push d # port # as int
+
+ mov $4, c # iterate on 4 bytes of IP addr
+0: mov a, d
+ and $0xff, d
+ push d # part of IP addr
+ shr $8, a
+ dec c
+ jnz 0b # iterate
+
+ push $0f
+ pushl sP(bp) # -->output buf
+ call sprintf
+ jmp 1f
+0: .ascii "%u.%u.%u.%u:%d\0"
+1:
+ EPILOG
+#-----------------------------------------------
+ .end