.include "DS.S"
.data
.text
#-----------------------------------------------
# N O D E O P E R A T I O N S
#-----------------------------------------------
ARGS
DS CnstlnP # -->Constellation inst
DS port # node's port#
# returns
# nothing
# local vars
PROLOC
DL this, NodeL # this Node instance
DL thisP # -->this Node
DL deP # -->Debug inst for LOG macro
DL rc
DL tosignal # switch
DL sigact, SigActionL
DL sigmask, 128
DL sigret
EPILOC
#-----------------------------------------------
.global Node
Node:
PROLOG
lea this(bp), b # -->this Node
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save -->Debug locally for LOG macro
# save parm values
mov CnstlnP(bp), c # -->Constellation
mov c, N.cnstlnP(b) # save -->Constellation
mov port(bp), a
mov a, N.locPort(b) # save port#
# set Debug id
push a
cmpl $Cn.ring, Cn.topo(c)
je 0f
push $8f
jmp 1f
0: push $7f
1: cmp $0,Cn.ssl(c)
je 2f
push $6f
jmp 9f
2: push $5f
jmp 9f
5: .asciz "non"
6: .asciz ""
7: .asciz "RING"
8: .asciz "MASH"
9: DEBID "%sSSL %s node %d", 3 # N->ssl ? "" : "non", N.topo==mash ? "MASH" : "RING", locPort
call N.init # initialize node
call N.bind # bind node to local port
push N.kicker(b) # kicker indicator
LOG 5, "kicker=%d"
cmpl $0, N.kicker(b) # kicker ?
je 0f # no
call N.kickOff # start transfer
0:
call N.mainLoop # iterate on socket I/O select
LOG 5, "closing ssc"
push N.ssc(b)
SYS close
movl $0, rc(bp) # rc = OK
# wait on thread closing client side
cmpl $0, N.closing(b) # closing client side ?
je N.chkData # no
push $0
push N.ptid(b) # thread id
call pthread_join
test a, a
jz 1f
push a # unique error handlig for pthread_join
call strerror
push a
LOG 0, "pthread_join: %s"
push deP(bp)
call C.abend
1:
mov thisP(bp), b # -->this Node
push N.pid(b)
LOG 5, "task %u: closing thread returned"
# synchronize with all nodes in all constellations and then check received data
N.chkData:
call N.sync # wait for others
cmpl $0, N.kicker(b) # kicker ?
je 0f # no
push N.dataP(b) # -->Data
call Da.chk
test a, a # data check OK ?
jnz 0f # yes
movl $1, rc(bp) # rc = BAD
LOG 0, "data on INPUT TO and OUTPUT FROM constellation DIFFER"
0:
push N.pid(b)
push rc(bp)
LOG 2, "end of operations, rc=%d, process=%u"
call exit
#-----------------------------------------------
# C O N S T R U C T O R
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
DL deP # -->Debug
# DL CnstlnP # -->Constellation
DL len # various lengths
strL = 128
DL str, strL # string buf
EPILOC
#-----------------------------------------------
N.init: # c: -->Cnstln
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
LOG 4, "initializing..."
# initialize node's attributes
call getpid
mov a, N.pid(b)
mov N.cnstlnP(b), c # -->Constellation
push deP(bp) # -->caller Debug id
lea N.data(b), a
mov a, N.dataP(b)
push a # -->Data
call Data # initialize data item
mov Cn.topo(c), a
mov a, N.topo(b) # save topology
mov Cn.ssl(c), a
mov a, N.ssl(b) # SSL switch
mov Cn.first(c), a
mov a, N.first(b) # save port# of first node in constellation
movl $0, N.kicker(b) # clear kicker switch
cmp a, N.locPort(b) # kicker ? (locPort == first)
sete N.kicker(b)
1: add Cn.nodes(c), a
dec a
mov a, N.last(b) # save port# of last node (last = first + nodes -1)
mov Cn.nodes(c), a
mov a, N.nodes(b) # save num. of nodes in constellation
mov Cn.div(c), a
mov a, N.div(b) # save divisor for random choice (MAX_INT / nodes)
mov Cn.topo(c), a
mov a, N.topo(b)
mov Cn.forwP(c), a
mov a, N.forwP(b) # save -->shared "forward" indicator
movl $0, N.closing(b) # unset closing indicator
# allocate and initialize server side and client side sockets
mov $SocketInfoL, a
mull N.nodes(b) # SocketInfo array len
mov a, N.sockArrLen(b) # save array len
push a
call malloc
cmp $0, a
jnl 0f
SYSERR "malloc"
0: mov thisP(bp), b # -->this Node
mov a, N.srvSideP(b) # save -->array of server side sockets
pushl N.sockArrLen(b)
call malloc
cmp $0, a
jnl 0f
SYSERR "malloc"
0: mov thisP(bp), b # -->this Node
mov a, N.cliSideP(b) # save -->array of client side sockets
# initialize allocated socket infos
xor c, c # offset
0: mov N.srvSideP(b), a # -->server side SocketInfo array
# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
movl $-1, So.sc(a, c) # indicate server side socket is not in use
mov N.cliSideP(b), a # -->client side SocketInfo array
# movl $So.labelV, So.label(a, c) # indicate SocketInfo block
movl $-1, So.sc(a, c) # indicate client side socket is not in use
lea SocketInfoL(c), c # incr by SocketInfo len
cmp c, N.sockArrLen(b)
ja 0b # iterate
cmpl $0, N.ssl(b) # node's ssl switch
jz N.initR # no SSL
# prepare SSL context
# call SSL_load_error_strings
# call SSL_library_init
call OPENSSL_init_ssl
LOG 4, "setting SSL contex...", 0
call TLS_method
push a
call SSL_CTX_new
mov thisP(bp), b # -->this Node
mov a, N.ctxP(b) # save -->new SSL context
test a, a
jnz 0f
SSLERR "new SSL CTX"
# set SSL mode
0: pushl $0
pushl $4 # SSL_MODE_AUTO_RETRY
pushl $33 # SSL_CTRL_MODE
pushl N.ctxP(b) # -->SSL context
call SSL_CTX_ctrl
movl $0, 8(sp) # no callback
movl $2, 4(sp) # mode = SSL_VERIFY_FAIL_IF_NO_PEER_CERT
call SSL_CTX_set_verify # set peer certificate verification parameters
# set X509 key file name
mov thisP(bp), b # -->this Node
pushl N.locPort(b) # local TCP port#
mov C.csP, c
pushl C.cePathP(c) # -->name of SSL path
pushl $0f # -->format
pushl $strL # buf len
lea str(bp), a
push a # -->string buf
call snprintf
jmp 1f
0: .asciz "%skeys/%u.key"
1:
pushl $1 # SSL_FILETYPE_PEM
lea str(bp), a # filename string
push a
LOG 5, "SSL private key used: %s", 1
mov thisP(bp), b # -->this Node inst
pushl N.ctxP(b) # -->SSL context
call SSL_CTX_use_PrivateKey_file
cmp $1, a
je 0f
SSLERR "hh's key file"
# set X509 cert file name
0: mov thisP(bp), b # -->this Node
pushl N.locPort(b) # local TCP port #
mov C.csP, c
pushl C.cePathP(c) # -->name of SSL path
pushl $0f
pushl $strL # buf len
lea str(bp), a # string buf
push a
call snprintf
jmp 1f
0: .asciz "%scerts/%u.pem"
1: mov thisP(bp), b # -->this Node
LOG 5, "SSL certificate used: %s", 1
pushl $1 # SSL_FILETYPE_PEM
lea str(bp), a # filename string
push a
pushl N.ctxP(b) # -->SSL context
call SSL_CTX_use_certificate_file
cmp $1, a
je 0f
SSLERR "hh's cert file"
# set path to CA
0: mov thisP(bp), b # -->this Node
mov C.csP, c
pushl C.caPathP(c) # -->name of SSL path
pushl $0f # -->CApath
LOG 5, "SSL: CA path: %s", 1
pushl $0 # -->CAfile not used
pushl N.ctxP(b) # -->SSL context
call SSL_CTX_load_verify_locations # set default locations for trusted CA certificates
cmp $1, a
je N.initR
SSLERR "hh's thrusted certs path"
0: .asciz "/home/local/etc/ssl/certs/"
N.initR:
LOG 5, "initalized"
EPILOG
#-----------------------------------------------
# K I C K O F F
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
DL deP # -->Debug
DL sci # socket info rank
DL digest, 24
EPILOC
N.kickOff:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
# load payload data
movl C.csP, c # -->CS
push C.txtP(c) # -->initial payload text
push C.ttl(c) # initial TTL
# lea N.data(b), a # -->Data
# push a
push N.dataP(b) # -->Data
call Da.load # load data container
call N.nextNode
mov a, N.next(b) # save next node#
sub N.first(b), a # first - next = socketinfo rank
mov a, sci(bp) # save socket info rank
lea digest(bp), a # -->digest buf
push a
push N.dataP(b) # -->Data
call Da.digest24
call Da.getDataLen
push N.next(b) # next node#
push a # data len
lea digest(bp), a # -->digest
push a
LOG 2, "kicker: ready to initial send %s, len=%u to node %u"
push N.next(b)
push sci(bp)
call N.conn # connect to next node
push sci(bp)
call N.put # send data to next node
EPILOG
#-----------------------------------------------
# M A I N L O O P : I T E R A T E O N S O C K E T I / O
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
DL deP # -->Debug
EPILOC
N.mainLoop:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
# prepare
LOG 5, "preparing for I/O select..."
call N.clearSocketMasks # clear socket masks for select and zero nfds
call N.maskSsc # mask ssc for select
# iterate while there are some sockets masked for select (nfds > 0)
0: cmp $0, N.nfds(b) # any I/O in progress ?
jz 0f # no, end operations of node
call N.selectSocketIo # select on server side sockets and forward data
call N.clearSocketMasks # clear socket masks for select and zero nfds
call N.maskSsc # mask ssc for select
call N.maskSrvSockets # mask all connected server side sockets for select
jmp 0b
0: EPILOG
#-----------------------------------------------
# S E L E C T S O C K E T I / O
#-----------------------------------------------
# select on masked sockets
# upon return from select
# accept connections to ssc
# forward data from posted sockets
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
DL deP # -->Debug
EPILOC
N.selectSocketIo:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
push N.nfds(b)
lea N.rs(b), a
push (a)
push 4(a)
push N.ssc(b)
LOG 5, "select ssc=%u, mask=%08x %08x, nfds=%u"
lea N.t(b), a # -->timeval
movl $1, Ti.secs(a)
movl $0, Ti.usecs(a)
push a
lea N.es(b), a
push a # -->es (exceptional mask is not used yet)
pushl $0
lea N.rs(b), a # -->rs
push a
push N.nfds(b)
SYS select
lea N.rs(b), a
push (a)
push 4(a)
LOG 5, "return from select, mask of posted=%08x %08x"
# check ssc for incomming connect request, accept it and forward data
lea N.rs(b), a # -->rs select mask
push a
pushl N.ssc(b)
call fd_isset
test a, a # ssc I/O ?
jz N.checkAndForw # no, check other sockets
# prepare for accept; find free srv socket info block for accept
mov N.srvSideP(b), a # -->srv socket info array
xor c, c # offet in socket info array
xor d, d # socket info rank
0: cmp $-1, So.sc(a, c) # socket allocated ?
je 0f # no, free socket found
inc d
lea SocketInfoL(c), c # inc offset into array
cmp c, N.sockArrLen(b)
ja 0b # iterate on sockets
ERR "can't accept, all sockets in use"
# free socket found, accept connection on it and forward data
0: push d
LOG 4, "slot for accept=%d"
call N.acc
call N.forw
# check all server side sockets for pending I/O and call forward on them
N.checkAndForw:
xor c, c # offet into socket info array
lea N.rs(b), a # -->rs select mask
push a
lea -4(sp), sp # adjust stacker for iteration
xor c, c # zero offset
xor d, d # set counter
# iterate through server side sockets and forward from posted sockets
0: mov N.srvSideP(b), a # -->srv socket info array
cmp $-1, So.sc(a, c) # socket connected ?
je 1f # no, iterate
mov So.sc(a, c), a
mov a, (sp) # stack socket#
# LOG 0, "checking port %u"
call fd_isset # socket I/O ?
test a, a
jz 1f # no, iterate
mov N.srvSideP(b), a
mov So.sc(a, c), a
mov d, (sp) # stack socket rank
# LOG 0, "port posted, rank=%u"
call N.forw # forward data
1: inc d
lea SocketInfoL(c), c # inc offset into array
cmp c, N.sockArrLen(b)
ja 0b # iterate on srv side sockets
EPILOG
#-----------------------------------------------
# M A S K C O N N E C T E D S O C K E T S F O R S E L E C T
#-----------------------------------------------
# mask all connected server side socketS for next select
PROLOC
DL thisP # -->this Node
EPILOC
N.maskSrvSockets:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.rs(b), a # -->rs select mask
push a
lea -4(sp), sp # adjust stacker for iteration
xor c, c # zero offet
0: mov N.srvSideP(b), a # -->srv socket info array
cmp $-1, So.sc(a, c) # socket connected ?
je 1f # no, iterate
mov So.sc(a, c), a
mov a, (sp) # stack socket#
call N.maskSocket # mask socket for select
1: lea SocketInfoL(c), c # inc offset into array
cmp c, N.sockArrLen(b)
ja 0b # iterate on srv side sockets
EPILOG
#-----------------------------------------------
# M A S K S S C F O R S E L E C T
#-----------------------------------------------
# mask ssc for select until forward is disabled
PROLOC
DL thisP # -->this Node
EPILOC
N.maskSsc:
PROLOG
mov b, thisP(bp) # save -->this Node
mov N.forwP(b), a
cmp $1, (a) # forwarding enabled ?
jne 0f # no, return
push N.ssc(b)
call N.maskSocket
0: EPILOG
#-----------------------------------------------
# C L E A R S O C K E T M A S K S
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
EPILOC
N.clearSocketMasks:
PROLOG
mov b, thisP(bp) # save -->this Node
movl $0, N.nfds(b) # zero nfds
lea N.rs(b), a # zero rs select mask
push a
call fd_zero
lea N.es(b), a # zero es select masks
push a
call fd_zero
EPILOG
#-----------------------------------------------
# M A S K S O C K E T F O R S E L E C T
#-----------------------------------------------
ARGS
DS sc # socket#
PROLOC
DL thisP # -->this Node
EPILOC
N.maskSocket:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.rs(b), a # -->rs select mask
push a
pushl sc(bp)
call fd_set
mov sc(bp), a # socket#
cmp a, N.nfds(b) # nfds > sc ?
jg 0f # yes
inc a
mov a, N.nfds(b) # nfds = sc + 1
0: EPILOG
#-----------------------------------------------
# F D S E T O P E R A T I N O S
#-----------------------------------------------
PROLOC # local data frame def. is used by 4 following subrs
DL thisP # -->this Node
dL deP
EPILOC
fd_zero:
# zero FD mask
ARGS
DS fdsetP # -->FD mask
PROLOG
mov fdsetP(bp), a
xor c, c
0: movl $0, (a, c, 4)
inc c
cmp $32, c # size of FD mask is 32 int (1024 bits)
jl 0b
EPILOG
fd_set:
# mask socket in FD mask
ARGS
DS sc
DS fdsetP # -->FD mask
PROLOG
movl sc(bp), a # sc
call fd_mask # a: integer offset, d: "1" bit in position according to sc
mov fdsetP(bp), c # -->select mask
or d, (c, a) # set sc mask
EPILOG
fd_clear:
# clear socket from FD mask
ARGS
DS sc
DS fdsetP # -->FD mask
PROLOG
movl sc(bp), a # sc
call fd_mask # a: integer offset, d: "1" bit in position according to sc
mov fdsetP(bp), c # -->select mask
not d
and d, (c, a) # clear sc mask
EPILOG
fd_isset:
# test socket bit status in FD mask
ARGS
DS sc
DS fdsetP # -->FD mask
PROLOG
movl sc(bp), a # sc
call fd_mask # a: integer offset, d: "1" bit in position according to sc
mov fdsetP(bp), c # -->select mask
and (c, a), d # sc selected ?
mov d, a
# mov $0, a
# jz 0f # not selected
# mov $1, a
0: EPILOG_R
fd_mask: # a: sc
# adjust offsets into FD mask
xor d, d
movl $32, c
divl c # a: mask integer rank, c: bit offset
shll $2, a # a: mask integer offset
mov d, c
mov $1, d
shl %cl, d # d: "1" bit in position according to sc
ret # a: offset of mask integer
#-----------------------------------------------
# F O R W A R D D A T A T O O T H E R N O D E
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
PROLOC
DL thisP # -->this Node
DL deP # -->Debug inst
DL digest, 24
EPILOC
#-----------------------------------------------
N.forw:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
push sci(bp) # socket info rank
call N.get
test a, a # data read ?
jz N.forwCloseSock # no, EOF, close this socket and start closing clients
# kicker checks TTL
cmp $1, N.kicker(b) # kicker ?
jne 1f # no, continue forwarding
lea -4(sp), sp # adjust stacker
lea digest(bp), a
push a # -->digest buf
push N.dataP(b) # -->Data
call Da.ttl
mov a, 8(sp) # TTL from data header
call Da.digest24
mov a, 4(sp) # -->text digest
call Da.getPort
mov a, (sp) # remote listen port from data header
LOG 5, "received from node %u: %s, ttl=%d"
push N.dataP(b) # -->Data
call Da.dttl # decrement TTL in data
test a, a # TTl > 0 ?
jz N.forwKickerStop # no, disable forwarding in constellation
# forward data
1: call N.nextNode # calculate next node
mov a, N.next(b)
push a
LOG 4, "next node %u"
sub N.first(b), a # socket info rank (next - first)
mov a, sci(bp) # save
# connect if not connected yet
movl $SocketInfoL, d
mul d # a: offset into socket info array
mov N.cliSideP(b), c # -->client side socket info array
cmp $-1, So.sc(c, a) # connected ?
jne 0f # yes
push N.next(b)
push sci(bp)
call N.conn
0:
lea -4(sp), sp
push N.dataP(b) # -->Data
call Da.ttl
mov a, 4(sp) # ttl
call Da.getDataLen
mov a, (sp) # container len
push N.next(b) # next node#
LOG 5, "forwarding to %d, len=%d, ttl=%d --->"
# pacing
mov C.csP, c # -->CS
cmp $0, C.pacing(c) # pacing active ?
je 0f # no
pushl $0
lea C.pace.tv_sec(c), a
push a
SYS nanosleep # pace
0: push sci(bp)
call N.put
LOG 4, "leaving forward"
jmp N.forwR # exit forwarding
N.forwKickerStop: # ttl = 0
lea digest(bp), a
push a # -->digest buf
push N.dataP(b) # -->Data
call Da.digest24
push a
LOG 1, "received after finally passing all: %s", 1
mov N.forwP(b), a # -->shared forw indicator
movl $0, (a) # disable forwarding for all nodes in constellation
jmp N.forwCloseCli
N.forwCloseSock:
pushl deP(bp)
pushl $1 # indicate "server side"
push sci(bp) # socket info rank
call N.closeSocket # close this server side socket
N.forwCloseCli:
call N.closeClients # launch client side closing thread
LOG 4, "leaving forward, closing"
N.forwR:
EPILOG
#-----------------------------------------------
# D E T E R M I N E N E X T N O D E
#-----------------------------------------------
PROLOC
DL thisP # -->this Node
EPILOC
N.nextNode:
PROLOG
mov b, thisP(bp) # save -->this Node
cmp $Cn.ring, N.topo(b) # ring ?
jne 0f # no
# ring
mov N.locPort(b), a
inc a # next = locPort + 1
cmp a, N.last(b) # next > last ?
jnl N.nextNodeR
mov N.first(b), a
jmp N.nextNodeR
# mash
0: call random
xor d, d
mov thisP(bp), b
divl N.div(b) # 0 <= random < nodes
add N.first(b), a # first + random
cmp a, N.locPort(b)
je 0b # iterate until other then local node is selected
N.nextNodeR:
push a
EPILOG_R
#-----------------------------------------------
# B I N D T O L O C A L T C P P O R T
#-----------------------------------------------
# returns
# nothing
PROLOC
DL thisP # -->this Node
DL deP # -->Debug
DL i
DL aiP # -->IP addrinfo
strL = 64
DL str, strL # string buf
EPILOC
#-----------------------------------------------
.global N.bind
N.bind:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
LOG 4, "binding...", 0
# prepare IP addr
pushl $AddrInfoL
call malloc # alloc addrinfo for hints
mov a, aiP(bp)
pushl $AddrInfoL
pushl $0
push a
call memset # clear hints
movl $2, ai_family(a) # AF_INET
movl $1, ai_flags(a) # AI_PASSIVE for bind
movl $0, ai_protocol(a) # any protocol
movl $1, ai_socktype(a) # SOCK_STREAM, blocking type
mov thisP(bp), b # -->this Node
pushl N.locPort(b) # local port #
push $0f
lea str(bp), a
push a # -->str buff
call sprintf
jmp 1f
0: .ascii "%d\0"
1:
lea aiP(bp), a
push a # -->-->addrinfo for new addrinfo
pushl aiP(bp) # -->addrinfo hints
lea str(bp), a
push a # -->loc port # string
pushl $0f
call getaddrinfo
jmp 1f
0: .ascii "localhost\0"
# check IP addr
1: cmp $0, a # getaddrinfo OK ?
jz 0f # yes
push a
call gai_strerror
push a
LOG 0, "getaddrinfo error: %s, ABEND", 1
push $1
call exit
# log assigned IP addr
0: pushl deP(bp)
mov aiP(bp), a # -->addrinfo
pushl ai_addrP(a) # -->sockaddr
lea str(bp), a
push a # -->string buf
call gai
lea str(bp), a
push a
LOG 5, "getaddrinfo OK, %s", 1
# allocate socket
mov aiP(bp), a
pushl ai_protocol(a)
pushl ai_socktype(a)
pushl ai_family(a)
SYS socket
mov a, N.ssc(b) # save ssc
# set socket option SO_REUSEADDR
pushl $4 # integer width
lea 9f, a # value 1
push a
pushl $2 # SO_REUSEADDR
pushl $1 # SOL_SOCKET
push N.ssc(b) # ssc
SYS setsockopt
jmp 8f
9: .long 1
8:
# bind
mov aiP(bp), a
pushl ai_addrlen(a)
pushl ai_addrP(a)
push N.ssc(b) # ssc
SYS bind
# listen
pushl $1 # pend conns queue len
pushl N.ssc(b) # ssc
SYS listen
pushl N.locPort(b)
LOG 2, "bound to %d", 1
N.bindR:
EPILOG
#-----------------------------------------------
# C O N N E C T T O R E M O T E N O D E
#-----------------------------------------------
# args
ac = 40
DS i # socket rank
DS remPort # TCP port# on remote site
# returns
# nothing
# local vars
ac = 0
DL debug, DebugL # Debug inst
DL deP # -->Debug inst
DL retry # retry counter
DL ai, AddrInfoL # IP addrinfo
DL aiP # -->IP addrinfo
DL str, 64 # string buf
DL currSockP # -->SocketInfo save area
locL = ac
#-----------------------------------------------
.global N.conn
N.conn:
PROLOG
mov b, thisP(bp) # save -->this Node
lea debug(bp), a # -->own local debug block
mov a, deP(bp) # save -->local Debug
pushl remPort(bp) # remote port
lea N.debug(b), d # -->node's Debug
lea D.id(d), a # -->node's debug ID
push a
DEBID "%s to %u", 2 # set local debug ID
LOG 3, "connecting...", 0
mov N.cliSideP(b), c # -->SocketInfo array
mov $SocketInfoL, a
mull i(bp) # SocketInfo offset
lea (c, a), c # -->curr SocketInfo
mov c, currSockP(bp) # save -->curr SocketInfo
mov remPort(bp), a
mov a, So.remPort(c) # put remote port # into curr SocketInfo
# prepare IP addrinfo hints
pushl $AddrInfoL # IP addrinfo length
pushl $0
lea ai(bp), a
mov a, aiP(bp) # -->IP addrinfo
push a
call memset # clear addrinfo
movl $2, ai_family(a) # AF_INET
movl $0, ai_flags(a)
movl $0, ai_protocol(a) # any protocol
movl $1, ai_socktype(a) # SOCK_STREAM
# set up port # string
mov currSockP(bp), a # -->curr socket info
pushl So.remPort(a) # remote port#
pushl $0f
lea str(bp), a
push a # -->port# string
call sprintf
jmp 1f
0: .ascii "%u\0"
# get IP addrinfo block chain
1: lea aiP(bp), a # -->-->IP addrinfo
push a
pushl aiP(bp) # -->IP addrinfo hints
lea str(bp), a # -->remote port # string
push a
pushl $0f
call getaddrinfo
jmp 1f
0: .ascii "localhost\0"
# check IP addr
1: cmp $0, a # getaddrinfo OK ?
jz 0f # yes
push a
call gai_strerror
push a
LOG 0, "getaddrinfo error: %s, ABEND", 1
push $1
call exit # ABEND
# log assigned IP addr
0: pushl deP(bp) # -->Debug inst
mov aiP(bp), a # -->IP addrinfo
pushl ai_addrP(a) # -->sockaddr
lea str(bp), a
push a # -->string buf
call gai
LOG 5, "getaddrinfo OK, %s", 1
# allocate comm socket
mov aiP(bp), a # -->IP addrinfo
pushl ai_protocol(a)
pushl ai_socktype(a)
pushl ai_family(a)
# call socket # allocate comm socket
# cmp $0, a
# jg 0f
# SYSERR "socket alloc"
#0:
SYS socket
mov currSockP(bp), c # -->curr SocketInfo
mov a, So.sc(c) # put new socket # into SocketInfo
# connect to remote partner server side
mov C.csP, a
mov C.connTh(a), a # conn retry threshold
mov a, retry(bp) # initilize retry counter
mov aiP(bp), a # -->IP addrinfo
pushl ai_addrlen(a)
pushl ai_addrP(a)
pushl So.sc(c) # socket #
2: call connect
cmp $0, a
jnl 0f # connected
call __errno_location # get --> sys errno
cmp $111, (a) # ECONNREFUSED ?
je 1f # yes, wait to retry
SYSERR "connect" # no, ABEND
1: pushl $22000 # 22 msecs
call usleep
lea 4(sp), sp
sub $1, retry(bp)
jnz 2b # iterate
# conn retry threshold reached
mov C.csP, a
pushl C.connTh(a)
LOG 0, "connection refused, threshold %d reached", 1
pushl $1
call exit # ABEND
0:
# connected, account new connection
mov C.csP, c # -->CS
mov C.shP(c), a # -->shared mem
lea S.counter_sem(a), a # -->overall counter semaphore
push a
SYS sem_wait
mov C.csP, c # -->CS
mov C.shP(c), a # -->shared mem
incl S.conns(a) # incr conns counter
lea S.counter_sem(a), a # -->overall msg counter semaphore
push a
SYS sem_post
# prepare SSL
cmp $1, N.ssl(b) # node's ssl switch
jne N.connRet # no SSL
LOG 5, "prepare for SSL"
pushl N.ctxP(b) # -->SSL context
call SSL_new # create a SSL structure
mov currSockP(bp), c # -->curr SocketInfo
mov a, So.sslP(c) # put -->SSL struct into SocketInfo
test a, a
jnz 0f
SSLERR "new SSL"
# connect on SSL
0: call ERR_clear_error
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sc(c) # socket
pushl So.sslP(c) # -->SSL struct
call SSL_set_fd # connect the SSL object with a file descriptor
test a, a
jnz 0f
SSLERR "client SSL set fd"
0:
LOG 5, "SSL fd set"
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sslP(c) # -->SSL struct
call SSL_connect
test a, a # connected ?
jns 1f # yes
push a
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sslP(c) # -->SSL struct
call SSL_get_error # SSL_get_error(-->ssl, err)
cmp $5, a # SSL_ERROR_SYSCALL ?
jne 0f # no, other SSL err
SYSERR "SSL connect"
0: SSLERR "SSL connect"
1:
mov C.csP, a
mov C.connTh(a), a
sub retry(bp), a # # of retries = conn threshold - counter
push a
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sc(c) # socket#
LOG 2, "connected via sc=%d after %d retries", 2
N.connRet:
EPILOG
#-----------------------------------------------
# A C C E P T C O N N E C T I O N
#-----------------------------------------------
ARGS
DS i # socket rank
# returns
# nothing
# local vars
PROLOC
DL thisP # -->this Node
DL deP # -->Debug inst
DL sa, SockAddrL # sockaddr
DL str, 64 # string buf
DL currSockP # -->curr SocketInfo save area
EPILOC
#-----------------------------------------------
.global N.acc
N.acc:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
LOG 4, "accepting..."
mov N.srvSideP(b), c # -->SocketInfo array
mov $SocketInfoL, a
mull i(bp) # curr SocketInfo offset
lea (c, a), a # -->curr SocketInfo
mov a, currSockP(bp) # save -->curr SocketInfo
# accept
pushl $0
pushl $0
# mov thisP, b # -->Node inst
pushl N.ssc(b) # listen socket
# call accept
# cmp $0, a
# jnl 0f
# SYSERR "accept"
#0:
SYS accept
mov currSockP(bp), c # -->Node inst
mov a, So.sc(c) # save comm socket of accepted connection
pushl $0f # -->sa length
lea sa(bp), a # -->sockaddr
push a
pushl So.sc(c) # comm socket
call getpeername
jmp 1f
0: .int SockAddrL
1: pushl deP(bp)
lea sa(bp), a # -->sockaddr
push a
lea str(bp), a # -->string buf
push a
call gai # get string of addr/port
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sc(c) # comm socket
LOG 3, "peer: sc=%d, %s", 2
# prepare SSL
mov thisP(bp), b # -->this Node
cmp $1, N.ssl(b) # node's ssl switch
jne 1f # no SSL
pushl N.ctxP(b) # -->SSL context
call SSL_new # create a SSL structure
mov currSockP(bp), c # -->curr SocketInfo
mov a, So.sslP(c) # put -->SSL struct into SocketInfo
test a, a
jnz 0f
SSLERR "new SSL"
# accept on SSL
0: call ERR_clear_error
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sc(c) # socket #
pushl So.sslP(c) # -->SSL struct
call SSL_set_fd # connect the SSL object with a file descriptor
test a, a
jnz 0f
SSLERR "client SSL set fd"
0:
LOG 5, "SSL fd set"
call SSL_accept
test a, a
jns 1f
mov a, 4(sp)
call SSL_get_error # SSL_get_error(-->ssl, err)
cmp $5, a # SSL_ERROR_SYSCALL ?
jne 0f # no, other SSL err
SYSERR "SSL accept"
0: SSLERR "SSL accept"
1:
LOG 2, "accepted"
EPILOG
#-----------------------------------------------
# R E A D F R O M C O M M S O C K E T
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
# returns length read
PROLOC
DL thisP # -->this Node
DL deP # -->Debug inst
DL shP # -->shared mem
DL currSockP # -->curr SocketInfo
DL n # len read
DL rest # len to be read
DL buf # read dest
EPILOC
#-----------------------------------------------
.global N.read
N.read:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
mov C.csP, a # -->CS
mov C.shP(a), a # -->shared mem
mov a, shP(bp) # save -->shared mem
push N.dataP(b)
call Da.getDataLen
mov a, rest(bp) # init len to be read
call Da.getContP
mov a, buf(bp) # init read dest
mov N.srvSideP(b), c # -->server side SocketInfo array
mov $SocketInfoL, a
mull sci(bp) # curr SocketInfo offset
lea (c, a), c # -->curr SocketInfo
mov c, currSockP(bp) # save -->curr SocketInfo
push So.sc(c) # comm socket
push rest(bp) # len to be read
LOG 5, "ready to read len=%d from sc=%d..."
# iterate on reading until whole length is read
N.readIterate:
cmp $0, rest(bp) # something to be read ?
jng N.readAcc # no, end reading
push rest(bp) # len to be read
push buf(bp) # read dest
mov currSockP(bp), c # curr SocketInfo
cmp $1, N.ssl(b) # SSL ?
jne 0f # no
# read w/ SSL
push So.sslP(c) # -->SSL struct
SYS SSL_read
jmp 1f
# read w/o SSL
0:
push So.sc(c) # socket #
SYS read
# adjust read controls
1:
jz N.readEof # EOF read
add a, buf(bp) # adjust -->read dest
sub a, rest(bp) # decrement rest to read
cmp $0, rest(bp) # everything read ?
jng 0f
push a
LOG 5, "partly read %d bytes"
0: jmp N.readIterate
# account data just read
N.readAcc:
mov shP(bp), a # -->shared mem
lea S.counter_sem(a), a # overall msg counter semaphore
push a
SYS sem_wait # get semaphore
mov shP(bp), a
incl S.msgs(a) # incr msgs counter
lea S.counter_sem(a), a # overall msg counter semaphore
push a
SYS sem_post # post semaphore
lea -4(sp), sp # adjust stacker
push N.dataP(b) # -->Data
call Da.getPort
mov a, 4(sp) # remote listen port
call Da.getDataLen
mov a, (sp) # data length
LOG 5, "%d read from node %u"
mov (sp), a # return len read
jmp N.readRet
# EOF read, finish
N.readEof:
LOG 4, "EOF read"
N.readRet:
EPILOG_R
#-----------------------------------------------
# W R I T E T O C O M M S O C K E T
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
# returns length written
PROLOC
DL thisP # -->this Node
DL deP # -->Debug inst
DL currSockP # -->curr SocketInfo
DL n # len written
DL rest # len to be written
DL buf # write orig
EPILOC
#-----------------------------------------------
.global N.write
N.write:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
# init control values
mov C.csP, a # -->CS
mov C.shP(a), a # -->shared mem
mov a, shP(bp) # save -->shared mem
push N.dataP(b) # -->Data
call Da.getDataLen
mov a, rest(bp) # init len to be written
call Da.getContP
mov a, buf(bp) # init write orig
mov N.cliSideP(b), c # -->SocketInfo array
mov $SocketInfoL, a
mull sci(bp) # curr SocketInfo offset
lea (c, a), c # -->curr SocketInfo
mov c, currSockP(bp) # save -->curr SocketInfo
push So.sc(c) # comm socket
push rest(bp) # len to be written
LOG 5, "ready to write len=%d to sc=%d..."
# iterate on writing until whole length is written
N.writeIterate:
cmp $0, rest(bp) # something to be written ?
jng N.writeEnd # no, end writing
push rest(bp) # len to be written
push buf(bp) # write orig
cmp $1, N.ssl(b) # SSL ?
jne 0f # no
# SSL
pushl So.sslP(c) # -->SSL struct
SYS SSL_write
jmp 1f
# no SSL
0:
pushl So.sc(c) # socket
SYS write
# adjust write controls
1:
mov a, n(bp) # save len written
add a, buf(bp) # adjust write orig
sub a, rest(bp) # decrement rest to write
cmp $0, rest(bp) # everything written ?
jng 0f
push a
LOG 5, "partly written %d bytes"
0: jmp N.writeIterate
# return datalen
N.writeEnd:
mov N.dataP(b), a
mov Da.datalen(a), a
push a
LOG 5, "%d written", 1
EPILOG_R
#-----------------------------------------------
# G E T D A T A F R O M C O M M S O C K E T
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
# returns boolean: 0 - EOF read, 1 - data read
PROLOC
DL thisP # -->this Node
DL deP
EPILOC
#-----------------------------------------------
.global N.get
N.get:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
push sci(bp) # SocketInfo rank
LOG 5, "get data from socket, rank=%u"
call N.read
test a, a
jz 0f
mov $1, a
0: EPILOG_R
#-----------------------------------------------
# P U T D A T A T O C O M M S O C K E T
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
# returns boolean: 0 - nothing written, 1 - data written
PROLOC
DL thisP # -->this Node
DL deP
EPILOC
#-----------------------------------------------
.global N.put
N.put:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
LOG 5, "put data to socket"
mov N.dataP(b), d # -->Data
mov Da.contP(d), a # -->container
lea Co.hdr(a), a # -->header
mov N.locPort(b), c
mov c, H.lport(a) # listen port to header
pushl sci(bp) # SocketInfo rank
call N.write
test a, a
jz 0f
mov $1, a
0: EPILOG_R
#-----------------------------------------------
# C L O S E C O N N E C T I O N S O C K E T
#-----------------------------------------------
ARGS
DS sci # SocketInfo rank
DS server # indicate server or client
DS deP # -->Debug inst
# returns:# nothing
PROLOC
DL thisP # -->this Node
DL currSockP # -->curr SocketInfo
DL tagP
DL e
EPILOC
N.srvSideTag: .asciz "server side"
N.cliSideTag: .asciz "client side"
#-----------------------------------------------
.global N.closeSocket
N.closeSocket:
PROLOG
mov b, thisP(bp) # save -->this Node
mov N.srvSideP(b), c # -->SocketInfo array
movl $N.srvSideTag, tagP(bp)
cmp $1, server(bp) # server side ?
je 0f
mov N.cliSideP(b), c # client side
movl $N.cliSideTag, tagP(bp)
0: mov $SocketInfoL, a
mull sci(bp) # curr SocketInfo offset
lea (c, a), c
mov c, currSockP(bp) # save -->curr SocketInfo
push So.sc(c) # socket to close
push tagP(bp)
LOG 5, "closing %s sc=%u...", 2
cmp $0, N.ssl(b) # SSL ?
je 2f # no
# SSL shutdown
pushl So.sslP(c) # -->SSL structure
call SSL_shutdown
cmp $0, a
jns 0f # no err
SSLERR "SSL shutdown (1)"
0: jnz 2f # SSL shutdown finished
LOG 5, "SSL shutdown rc=0, retry"
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sslP(c) # -->SSL structure
call SSL_shutdown
cmp $0, a
jg 2f # SSL shutdown finished
push a # SSL err
mov currSockP(bp), c # -->curr SocketInfo
pushl So.sslP(c) # -->SSL structure
call SSL_get_error
cmp $5, a # SSL_ERROR_SYSCALL ?
jne 1f # no, other SSL err
# syscall err
call ERR_get_error
test a, a # SSL err ?
jnz 2f # yes, don't care
call __errno_location # sys err ?
test a, a
jz 2f # no
SYSERR "SSL shutdown (2)"
# other SSL err, print SSL err queue
1: SSLERR "SSL shutdown (2)"
# close socket
2: mov currSockP(bp), c # -->curr SocketInfo
pushl So.sc(c) # socket to close
# call close
# test a, a
# jz 0f
# SYSERR "close"
#0:
SYS close
push tagP(bp)
LOG 4, "%s sc=%u closed", 2
mov currSockP(bp), c # -->curr SocketInfo
movl $-1, So.sc(c) # indicate closed socket
EPILOG
#-----------------------------------------------
# C R E A T E T H R E A D T O C L O S E C L I E N T S I D E S
#-----------------------------------------------
ARGS
PROLOC
DL thisP # -->this Node
DL deP # -->Debug inst
DL pt # pthread_t (int)
EPILOC
#-----------------------------------------------
N.closeClients:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save local -->Debug
cmp $1, N.closing(b)
je 0f # already closing, nothing to do
LOG 5, "spawning thread to close client side"
push b # -->this (param for thread)
pushl $N.closeCliThread # -->thread func
pushl $0
lea N.ptid(b), a # -->thread ID
push a
call pthread_create
test a, a
jz 1f
SYSERR "create thread"
1: mov thisP(bp), b # -->this Node
movl $1, N.closing(b) # indicate "closing"
push N.ptid(b)
LOG 5, "closing thread (%u) spawned"
0: EPILOG
#-----------------------------------------------
# C L O S E C L I E N T S I D E S
#-----------------------------------------------
ARGS
DS thisP
PROLOC
DL deP # -->Debug inst
DL debug, DebugL
EPILOC
#-----------------------------------------------
N.closeCliThread:
PROLOG
mov thisP(bp), b # -->this Node
lea debug(bp), a
mov a, deP(bp) # save deP
lea N.debug(b), a
lea D.id(a), a # -->caller debug id
push a
DEBID "%s CLOSE clients", 1
LOG 5, "start..."
mov $0, c # offet in socket info array
mov $0, d # socket info rank
mov N.cliSideP(b), a # -->client socket info array
0: # iterate on opened sockets
cmp $-1, So.sc(a, c) # sc allocated ?
je 1f # no, iterate
push So.sc(a, c) # socket#
LOG 4, "sc=%u"
push deP(bp)
pushl $0 # indicate "client"
push d # socket info rank
call N.closeSocket # close socket
1: inc d
cmp N.nodes(b), d # rank < nodes ?
je 0f # no, end
lea SocketInfoL(c), c # incr offset
jmp 0b # iterate
0: LOG 4, "finished, exiting thread"
pushl $0
call pthread_exit
EPILOG
#-----------------------------------------------
# S Y N C H R O N I Z E A L L N O D E S
#-----------------------------------------------
ARGS
PROLOC
DL thisP
DL deP
DL shP
DL tosignal
DL sigmask, 128
DL sigact, SigActionL
EPILOC
N.sync:
PROLOG
mov b, thisP(bp) # save -->this Node
lea N.debug(b), a
mov a, deP(bp) # save -->Debug locally for LOG macro
movl $0, tosignal(bp)
mov C.csP, c # -->CS
mov C.shP(c), a # -->shared mem
push S.act(a)
mov a, shP(bp)
push N.pid(b)
LOG 5, "%u synchronizing: active=%u"
# set signal handler block
lea sigact(bp), a
movl $N.sighandle, sa_handler(a)
movl $0, sa_flags(a)
lea sa_mask(a), a # set handler mask
push a
SYS sigfillset
jmp 0f
N.sighandle:
ret
0:
# activate signal handler
lea sigmask(bp), a
push a
SYS sigemptyset
push $SIGUSR2
lea sigmask(bp), a
push a
SYS sigaddset
push $0
lea sigmask(bp), a
push a
push $SIG_UNBLOCK
SYS sigprocmask
push $0
lea sigact(bp), a
push a
push $SIGUSR2
SYS sigaction
# update nodes counter
mov shP(bp), a
lea S.counter_sem(a), a
push a
SYS sem_wait # get semaphore
mov shP(bp), a
decl S.act(a) # decrement nodes counter
setz tosignal(bp)
mov shP(bp), a
lea S.counter_sem(a), a
push a
SYS sem_post # post semaphore
testl $1, tosignal(bp) # all finished?
jz N.sigwait
# signal all tasks
push N.pid(b)
LOG 5, "%u signaling USR2"
push $SIGUSR2
push $0
SYS kill
jmp N.woken
# wait for signal
N.sigwait:
# call pause
push $5
call sleep
N.woken:
mov thisP(bp), b # -->this Node
push N.pid(b)
LOG 5, "%u woken up"
EPILOG
#-----------------------------------------------
# F O R M A T A N D P R I N T S S L E R R O R
#-----------------------------------------------
# args
ac = 8
DS deP # -->Debug
DS msgP # -->accompanying msg str
# returns: nothing
PROLOC
DL str, 256 # generated string buf
EPILOC
#-----------------------------------------------
.global N.sslErr
N.sslErr:
enter $locL, $0
0:
call ERR_get_error
push a
LOG 5, "ssl err, e=%lu", 1
test a, a # end of msg queue ?
jz 0f # yes
lea str(bp), d
push d # -->err msg str buf
push a # err
call ERR_error_string
push a # -->err msg str
pushl msgP(bp) # -->accompanying msg str
LOG 0, "%s: %s", 2
jmp 0b # iterate on SSL err msg queue
0:
pushl $1
call exit # ABEND
#-----------------------------------------------
# G E T A S S I G N E D I P A D D R
#-----------------------------------------------
# puts assigned IP address:port in string
ARGS
DS sP # -->output string
DS saP # -->sockaddr
DS deP # -->Debug
# returns: nothing
PROLOC
EPILOC
#-----------------------------------------------
gai: PROLOG
mov saP(bp), b # -->sockaddr
xor a, a
mov sa_data(b), %ax # port # in big endian order
push a
call ntohs
mov a, d
mov saP(bp), b # -->sockaddr
pushl sa_data+2(b) # IP addr in big endian order
call ntohl
push d # port # as int
mov $4, c # iterate on 4 bytes of IP addr
0: mov a, d
and $0xff, d
push d # part of IP addr
shr $8, a
dec c
jnz 0b # iterate
push $0f
pushl sP(bp) # -->output buf
call sprintf
jmp 1f
0: .ascii "%u.%u.%u.%u:%d\0"
1:
EPILOG
#-----------------------------------------------
.end