diff -r 000000000000 -r 5c129dd80d4f CSa32/Node.S --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSa32/Node.S Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1616 @@ + .include "DS.S" + .data + .text +#----------------------------------------------- +# N O D E O P E R A T I O N S +#----------------------------------------------- + ARGS + DS CnstlnP # -->Constellation inst + DS port # node's port# +# returns +# nothing +# local vars + PROLOC + DL this, NodeL # this Node instance + DL thisP # -->this Node + DL deP # -->Debug inst for LOG macro + DL rc + DL tosignal # switch + DL sigact, SigActionL + DL sigmask, 128 + DL sigret + EPILOC +#----------------------------------------------- + .global Node +Node: + PROLOG + lea this(bp), b # -->this Node + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save -->Debug locally for LOG macro +# save parm values + mov CnstlnP(bp), c # -->Constellation + mov c, N.cnstlnP(b) # save -->Constellation + mov port(bp), a + mov a, N.locPort(b) # save port# +# set Debug id + push a + cmpl $Cn.ring, Cn.topo(c) + je 0f + push $8f + jmp 1f +0: push $7f +1: cmp $0,Cn.ssl(c) + je 2f + push $6f + jmp 9f +2: push $5f + jmp 9f +5: .asciz "non" +6: .asciz "" +7: .asciz "RING" +8: .asciz "MASH" +9: DEBID "%sSSL %s node %d", 3 # N->ssl ? "" : "non", N.topo==mash ? "MASH" : "RING", locPort + call N.init # initialize node + call N.bind # bind node to local port + push N.kicker(b) # kicker indicator + LOG 5, "kicker=%d" + cmpl $0, N.kicker(b) # kicker ? + je 0f # no + call N.kickOff # start transfer +0: + call N.mainLoop # iterate on socket I/O select + LOG 5, "closing ssc" + push N.ssc(b) + SYS close + movl $0, rc(bp) # rc = OK +# wait on thread closing client side + cmpl $0, N.closing(b) # closing client side ? + je N.chkData # no + push $0 + push N.ptid(b) # thread id + call pthread_join + test a, a + jz 1f + push a # unique error handlig for pthread_join + call strerror + push a + LOG 0, "pthread_join: %s" + push deP(bp) + call C.abend +1: + mov thisP(bp), b # -->this Node + push N.pid(b) + LOG 5, "task %u: closing thread returned" +# synchronize with all nodes in all constellations and then check received data +N.chkData: + call N.sync # wait for others + cmpl $0, N.kicker(b) # kicker ? + je 0f # no + + push N.dataP(b) # -->Data + call Da.chk + test a, a # data check OK ? + jnz 0f # yes + movl $1, rc(bp) # rc = BAD + LOG 0, "data on INPUT TO and OUTPUT FROM constellation DIFFER" +0: + push N.pid(b) + push rc(bp) + LOG 2, "end of operations, rc=%d, process=%u" + call exit +#----------------------------------------------- +# C O N S T R U C T O R +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug +# DL CnstlnP # -->Constellation + DL len # various lengths + strL = 128 + DL str, strL # string buf + EPILOC +#----------------------------------------------- +N.init: # c: -->Cnstln + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "initializing..." +# initialize node's attributes + call getpid + mov a, N.pid(b) + mov N.cnstlnP(b), c # -->Constellation + push deP(bp) # -->caller Debug id + lea N.data(b), a + mov a, N.dataP(b) + push a # -->Data + call Data # initialize data item + mov Cn.topo(c), a + mov a, N.topo(b) # save topology + mov Cn.ssl(c), a + mov a, N.ssl(b) # SSL switch + mov Cn.first(c), a + mov a, N.first(b) # save port# of first node in constellation + movl $0, N.kicker(b) # clear kicker switch + cmp a, N.locPort(b) # kicker ? (locPort == first) + sete N.kicker(b) +1: add Cn.nodes(c), a + dec a + mov a, N.last(b) # save port# of last node (last = first + nodes -1) + mov Cn.nodes(c), a + mov a, N.nodes(b) # save num. of nodes in constellation + mov Cn.div(c), a + mov a, N.div(b) # save divisor for random choice (MAX_INT / nodes) + mov Cn.topo(c), a + mov a, N.topo(b) + mov Cn.forwP(c), a + mov a, N.forwP(b) # save -->shared "forward" indicator + movl $0, N.closing(b) # unset closing indicator +# allocate and initialize server side and client side sockets + mov $SocketInfoL, a + mull N.nodes(b) # SocketInfo array len + mov a, N.sockArrLen(b) # save array len + push a + call malloc + cmp $0, a + jnl 0f + SYSERR "malloc" +0: mov thisP(bp), b # -->this Node + mov a, N.srvSideP(b) # save -->array of server side sockets + pushl N.sockArrLen(b) + call malloc + cmp $0, a + jnl 0f + SYSERR "malloc" +0: mov thisP(bp), b # -->this Node + mov a, N.cliSideP(b) # save -->array of client side sockets +# initialize allocated socket infos + xor c, c # offset +0: mov N.srvSideP(b), a # -->server side SocketInfo array +# movl $So.labelV, So.label(a, c) # indicate SocketInfo block + movl $-1, So.sc(a, c) # indicate server side socket is not in use + mov N.cliSideP(b), a # -->client side SocketInfo array +# movl $So.labelV, So.label(a, c) # indicate SocketInfo block + movl $-1, So.sc(a, c) # indicate client side socket is not in use + lea SocketInfoL(c), c # incr by SocketInfo len + cmp c, N.sockArrLen(b) + ja 0b # iterate + + cmpl $0, N.ssl(b) # node's ssl switch + jz N.initR # no SSL +# prepare SSL context +# call SSL_load_error_strings +# call SSL_library_init + call OPENSSL_init_ssl + LOG 4, "setting SSL contex...", 0 + call TLS_method + push a + call SSL_CTX_new + mov thisP(bp), b # -->this Node + mov a, N.ctxP(b) # save -->new SSL context + test a, a + jnz 0f + SSLERR "new SSL CTX" +# set SSL mode +0: pushl $0 + pushl $4 # SSL_MODE_AUTO_RETRY + pushl $33 # SSL_CTRL_MODE + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_ctrl + movl $0, 8(sp) # no callback + movl $2, 4(sp) # mode = SSL_VERIFY_FAIL_IF_NO_PEER_CERT + call SSL_CTX_set_verify # set peer certificate verification parameters +# set X509 key file name + mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local TCP port# + mov C.csP, c + pushl C.cePathP(c) # -->name of SSL path + pushl $0f # -->format + pushl $strL # buf len + lea str(bp), a + push a # -->string buf + call snprintf + jmp 1f +0: .asciz "%skeys/%u.key" +1: + pushl $1 # SSL_FILETYPE_PEM + lea str(bp), a # filename string + push a + LOG 5, "SSL private key used: %s", 1 + mov thisP(bp), b # -->this Node inst + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_use_PrivateKey_file + cmp $1, a + je 0f + SSLERR "hh's key file" +# set X509 cert file name +0: mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local TCP port # + mov C.csP, c + pushl C.cePathP(c) # -->name of SSL path + pushl $0f + pushl $strL # buf len + lea str(bp), a # string buf + push a + call snprintf + jmp 1f +0: .asciz "%scerts/%u.pem" +1: mov thisP(bp), b # -->this Node + LOG 5, "SSL certificate used: %s", 1 + pushl $1 # SSL_FILETYPE_PEM + lea str(bp), a # filename string + push a + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_use_certificate_file + cmp $1, a + je 0f + SSLERR "hh's cert file" +# set path to CA +0: mov thisP(bp), b # -->this Node + mov C.csP, c + pushl C.caPathP(c) # -->name of SSL path + pushl $0f # -->CApath + LOG 5, "SSL: CA path: %s", 1 + pushl $0 # -->CAfile not used + pushl N.ctxP(b) # -->SSL context + call SSL_CTX_load_verify_locations # set default locations for trusted CA certificates + cmp $1, a + je N.initR + SSLERR "hh's thrusted certs path" +0: .asciz "/home/local/etc/ssl/certs/" +N.initR: + LOG 5, "initalized" + EPILOG +#----------------------------------------------- +# K I C K O F F +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + DL sci # socket info rank + DL digest, 24 + EPILOC +N.kickOff: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# load payload data + movl C.csP, c # -->CS + push C.txtP(c) # -->initial payload text + push C.ttl(c) # initial TTL +# lea N.data(b), a # -->Data +# push a + push N.dataP(b) # -->Data + call Da.load # load data container + call N.nextNode + mov a, N.next(b) # save next node# + sub N.first(b), a # first - next = socketinfo rank + mov a, sci(bp) # save socket info rank + lea digest(bp), a # -->digest buf + push a + push N.dataP(b) # -->Data + call Da.digest24 + call Da.getDataLen + push N.next(b) # next node# + push a # data len + lea digest(bp), a # -->digest + push a + LOG 2, "kicker: ready to initial send %s, len=%u to node %u" + push N.next(b) + push sci(bp) + call N.conn # connect to next node + push sci(bp) + call N.put # send data to next node + EPILOG +#----------------------------------------------- +# M A I N L O O P : I T E R A T E O N S O C K E T I / O +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + EPILOC +N.mainLoop: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# prepare + LOG 5, "preparing for I/O select..." + call N.clearSocketMasks # clear socket masks for select and zero nfds + call N.maskSsc # mask ssc for select +# iterate while there are some sockets masked for select (nfds > 0) +0: cmp $0, N.nfds(b) # any I/O in progress ? + jz 0f # no, end operations of node + call N.selectSocketIo # select on server side sockets and forward data + call N.clearSocketMasks # clear socket masks for select and zero nfds + call N.maskSsc # mask ssc for select + call N.maskSrvSockets # mask all connected server side sockets for select + jmp 0b +0: EPILOG +#----------------------------------------------- +# S E L E C T S O C K E T I / O +#----------------------------------------------- +# select on masked sockets +# upon return from select +# accept connections to ssc +# forward data from posted sockets +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + EPILOC +N.selectSocketIo: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + push N.nfds(b) + lea N.rs(b), a + push (a) + push 4(a) + push N.ssc(b) + LOG 5, "select ssc=%u, mask=%08x %08x, nfds=%u" + lea N.t(b), a # -->timeval + movl $1, Ti.secs(a) + movl $0, Ti.usecs(a) + push a + lea N.es(b), a + push a # -->es (exceptional mask is not used yet) + pushl $0 + lea N.rs(b), a # -->rs + push a + push N.nfds(b) + SYS select + lea N.rs(b), a + push (a) + push 4(a) + LOG 5, "return from select, mask of posted=%08x %08x" +# check ssc for incomming connect request, accept it and forward data + lea N.rs(b), a # -->rs select mask + push a + pushl N.ssc(b) + call fd_isset + test a, a # ssc I/O ? + jz N.checkAndForw # no, check other sockets +# prepare for accept; find free srv socket info block for accept + mov N.srvSideP(b), a # -->srv socket info array + xor c, c # offet in socket info array + xor d, d # socket info rank +0: cmp $-1, So.sc(a, c) # socket allocated ? + je 0f # no, free socket found + inc d + lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on sockets + ERR "can't accept, all sockets in use" +# free socket found, accept connection on it and forward data +0: push d + LOG 4, "slot for accept=%d" + call N.acc + call N.forw +# check all server side sockets for pending I/O and call forward on them +N.checkAndForw: + xor c, c # offet into socket info array + lea N.rs(b), a # -->rs select mask + push a + lea -4(sp), sp # adjust stacker for iteration + xor c, c # zero offset + xor d, d # set counter +# iterate through server side sockets and forward from posted sockets +0: mov N.srvSideP(b), a # -->srv socket info array + cmp $-1, So.sc(a, c) # socket connected ? + je 1f # no, iterate + mov So.sc(a, c), a + mov a, (sp) # stack socket# +# LOG 0, "checking port %u" + call fd_isset # socket I/O ? + test a, a + jz 1f # no, iterate + mov N.srvSideP(b), a + mov So.sc(a, c), a + mov d, (sp) # stack socket rank +# LOG 0, "port posted, rank=%u" + call N.forw # forward data +1: inc d + lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on srv side sockets + EPILOG +#----------------------------------------------- +# M A S K C O N N E C T E D S O C K E T S F O R S E L E C T +#----------------------------------------------- +# mask all connected server side socketS for next select + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSrvSockets: + PROLOG + mov b, thisP(bp) # save -->this Node + + lea N.rs(b), a # -->rs select mask + push a + lea -4(sp), sp # adjust stacker for iteration + xor c, c # zero offet +0: mov N.srvSideP(b), a # -->srv socket info array + cmp $-1, So.sc(a, c) # socket connected ? + je 1f # no, iterate + mov So.sc(a, c), a + mov a, (sp) # stack socket# + call N.maskSocket # mask socket for select +1: lea SocketInfoL(c), c # inc offset into array + cmp c, N.sockArrLen(b) + ja 0b # iterate on srv side sockets + EPILOG +#----------------------------------------------- +# M A S K S S C F O R S E L E C T +#----------------------------------------------- +# mask ssc for select until forward is disabled + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSsc: + PROLOG + mov b, thisP(bp) # save -->this Node + + mov N.forwP(b), a + cmp $1, (a) # forwarding enabled ? + jne 0f # no, return + push N.ssc(b) + call N.maskSocket +0: EPILOG +#----------------------------------------------- +# C L E A R S O C K E T M A S K S +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + EPILOC +N.clearSocketMasks: + PROLOG + mov b, thisP(bp) # save -->this Node + + movl $0, N.nfds(b) # zero nfds + lea N.rs(b), a # zero rs select mask + push a + call fd_zero + lea N.es(b), a # zero es select masks + push a + call fd_zero + EPILOG +#----------------------------------------------- +# M A S K S O C K E T F O R S E L E C T +#----------------------------------------------- + ARGS + DS sc # socket# + PROLOC + DL thisP # -->this Node + EPILOC +N.maskSocket: + PROLOG + mov b, thisP(bp) # save -->this Node + + lea N.rs(b), a # -->rs select mask + push a + pushl sc(bp) + call fd_set + mov sc(bp), a # socket# + cmp a, N.nfds(b) # nfds > sc ? + jg 0f # yes + inc a + mov a, N.nfds(b) # nfds = sc + 1 +0: EPILOG +#----------------------------------------------- +# F D S E T O P E R A T I N O S +#----------------------------------------------- + PROLOC # local data frame def. is used by 4 following subrs + DL thisP # -->this Node + dL deP + EPILOC +fd_zero: +# zero FD mask + ARGS + DS fdsetP # -->FD mask + PROLOG + mov fdsetP(bp), a + xor c, c +0: movl $0, (a, c, 4) + inc c + cmp $32, c # size of FD mask is 32 int (1024 bits) + jl 0b + EPILOG +fd_set: +# mask socket in FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + or d, (c, a) # set sc mask + EPILOG +fd_clear: +# clear socket from FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + not d + and d, (c, a) # clear sc mask + EPILOG +fd_isset: +# test socket bit status in FD mask + ARGS + DS sc + DS fdsetP # -->FD mask + PROLOG + movl sc(bp), a # sc + call fd_mask # a: integer offset, d: "1" bit in position according to sc + mov fdsetP(bp), c # -->select mask + and (c, a), d # sc selected ? + mov d, a +# mov $0, a +# jz 0f # not selected +# mov $1, a +0: EPILOG_R +fd_mask: # a: sc +# adjust offsets into FD mask + xor d, d + movl $32, c + divl c # a: mask integer rank, c: bit offset + shll $2, a # a: mask integer offset + mov d, c + mov $1, d + shl %cl, d # d: "1" bit in position according to sc + ret # a: offset of mask integer +#----------------------------------------------- +# F O R W A R D D A T A T O O T H E R N O D E +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL digest, 24 + EPILOC +#----------------------------------------------- +N.forw: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + push sci(bp) # socket info rank + call N.get + test a, a # data read ? + jz N.forwCloseSock # no, EOF, close this socket and start closing clients +# kicker checks TTL + cmp $1, N.kicker(b) # kicker ? + jne 1f # no, continue forwarding + + lea -4(sp), sp # adjust stacker + lea digest(bp), a + push a # -->digest buf + push N.dataP(b) # -->Data + call Da.ttl + mov a, 8(sp) # TTL from data header + call Da.digest24 + mov a, 4(sp) # -->text digest + call Da.getPort + mov a, (sp) # remote listen port from data header + LOG 5, "received from node %u: %s, ttl=%d" + push N.dataP(b) # -->Data + call Da.dttl # decrement TTL in data + test a, a # TTl > 0 ? + jz N.forwKickerStop # no, disable forwarding in constellation +# forward data +1: call N.nextNode # calculate next node + mov a, N.next(b) + push a + LOG 4, "next node %u" + sub N.first(b), a # socket info rank (next - first) + mov a, sci(bp) # save +# connect if not connected yet + movl $SocketInfoL, d + mul d # a: offset into socket info array + mov N.cliSideP(b), c # -->client side socket info array + cmp $-1, So.sc(c, a) # connected ? + jne 0f # yes + push N.next(b) + push sci(bp) + call N.conn +0: + lea -4(sp), sp + push N.dataP(b) # -->Data + call Da.ttl + mov a, 4(sp) # ttl + call Da.getDataLen + mov a, (sp) # container len + push N.next(b) # next node# + LOG 5, "forwarding to %d, len=%d, ttl=%d --->" +# pacing + mov C.csP, c # -->CS + cmp $0, C.pacing(c) # pacing active ? + je 0f # no + pushl $0 + lea C.pace.tv_sec(c), a + push a + SYS nanosleep # pace +0: push sci(bp) + call N.put + LOG 4, "leaving forward" + jmp N.forwR # exit forwarding +N.forwKickerStop: # ttl = 0 + lea digest(bp), a + push a # -->digest buf + push N.dataP(b) # -->Data + call Da.digest24 + push a + LOG 1, "received after finally passing all: %s", 1 + mov N.forwP(b), a # -->shared forw indicator + movl $0, (a) # disable forwarding for all nodes in constellation + jmp N.forwCloseCli +N.forwCloseSock: + pushl deP(bp) + pushl $1 # indicate "server side" + push sci(bp) # socket info rank + call N.closeSocket # close this server side socket +N.forwCloseCli: + call N.closeClients # launch client side closing thread + LOG 4, "leaving forward, closing" +N.forwR: + EPILOG +#----------------------------------------------- +# D E T E R M I N E N E X T N O D E +#----------------------------------------------- + PROLOC + DL thisP # -->this Node + EPILOC +N.nextNode: + PROLOG + mov b, thisP(bp) # save -->this Node + + cmp $Cn.ring, N.topo(b) # ring ? + jne 0f # no +# ring + mov N.locPort(b), a + inc a # next = locPort + 1 + cmp a, N.last(b) # next > last ? + jnl N.nextNodeR + mov N.first(b), a + jmp N.nextNodeR +# mash +0: call random + xor d, d + mov thisP(bp), b + divl N.div(b) # 0 <= random < nodes + add N.first(b), a # first + random + cmp a, N.locPort(b) + je 0b # iterate until other then local node is selected +N.nextNodeR: + push a + EPILOG_R + +#----------------------------------------------- +# B I N D T O L O C A L T C P P O R T +#----------------------------------------------- +# returns +# nothing + PROLOC + DL thisP # -->this Node + DL deP # -->Debug + DL i + DL aiP # -->IP addrinfo + strL = 64 + DL str, strL # string buf + EPILOC +#----------------------------------------------- + .global N.bind +N.bind: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "binding...", 0 +# prepare IP addr + pushl $AddrInfoL + call malloc # alloc addrinfo for hints + mov a, aiP(bp) + pushl $AddrInfoL + pushl $0 + push a + call memset # clear hints + + movl $2, ai_family(a) # AF_INET + movl $1, ai_flags(a) # AI_PASSIVE for bind + movl $0, ai_protocol(a) # any protocol + movl $1, ai_socktype(a) # SOCK_STREAM, blocking type + + mov thisP(bp), b # -->this Node + pushl N.locPort(b) # local port # + push $0f + lea str(bp), a + push a # -->str buff + call sprintf + jmp 1f +0: .ascii "%d\0" +1: + lea aiP(bp), a + push a # -->-->addrinfo for new addrinfo + pushl aiP(bp) # -->addrinfo hints + lea str(bp), a + push a # -->loc port # string + pushl $0f + call getaddrinfo + jmp 1f +0: .ascii "localhost\0" +# check IP addr +1: cmp $0, a # getaddrinfo OK ? + jz 0f # yes + push a + call gai_strerror + push a + LOG 0, "getaddrinfo error: %s, ABEND", 1 + push $1 + call exit +# log assigned IP addr +0: pushl deP(bp) + mov aiP(bp), a # -->addrinfo + pushl ai_addrP(a) # -->sockaddr + lea str(bp), a + push a # -->string buf + call gai + + lea str(bp), a + push a + LOG 5, "getaddrinfo OK, %s", 1 +# allocate socket + mov aiP(bp), a + pushl ai_protocol(a) + pushl ai_socktype(a) + pushl ai_family(a) + SYS socket + mov a, N.ssc(b) # save ssc +# set socket option SO_REUSEADDR + pushl $4 # integer width + lea 9f, a # value 1 + push a + pushl $2 # SO_REUSEADDR + pushl $1 # SOL_SOCKET + push N.ssc(b) # ssc + SYS setsockopt + jmp 8f +9: .long 1 +8: +# bind + mov aiP(bp), a + pushl ai_addrlen(a) + pushl ai_addrP(a) + push N.ssc(b) # ssc + SYS bind +# listen + pushl $1 # pend conns queue len + pushl N.ssc(b) # ssc + SYS listen + pushl N.locPort(b) + LOG 2, "bound to %d", 1 +N.bindR: + EPILOG +#----------------------------------------------- +# C O N N E C T T O R E M O T E N O D E +#----------------------------------------------- +# args + ac = 40 + DS i # socket rank + DS remPort # TCP port# on remote site +# returns +# nothing +# local vars + ac = 0 + DL debug, DebugL # Debug inst + DL deP # -->Debug inst + DL retry # retry counter + DL ai, AddrInfoL # IP addrinfo + DL aiP # -->IP addrinfo + DL str, 64 # string buf + DL currSockP # -->SocketInfo save area + locL = ac +#----------------------------------------------- + .global N.conn +N.conn: + PROLOG + mov b, thisP(bp) # save -->this Node + lea debug(bp), a # -->own local debug block + mov a, deP(bp) # save -->local Debug + pushl remPort(bp) # remote port + lea N.debug(b), d # -->node's Debug + lea D.id(d), a # -->node's debug ID + push a + DEBID "%s to %u", 2 # set local debug ID + LOG 3, "connecting...", 0 + + mov N.cliSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull i(bp) # SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + mov remPort(bp), a + mov a, So.remPort(c) # put remote port # into curr SocketInfo +# prepare IP addrinfo hints + pushl $AddrInfoL # IP addrinfo length + pushl $0 + lea ai(bp), a + mov a, aiP(bp) # -->IP addrinfo + push a + call memset # clear addrinfo + movl $2, ai_family(a) # AF_INET + movl $0, ai_flags(a) + movl $0, ai_protocol(a) # any protocol + movl $1, ai_socktype(a) # SOCK_STREAM +# set up port # string + mov currSockP(bp), a # -->curr socket info + pushl So.remPort(a) # remote port# + pushl $0f + lea str(bp), a + push a # -->port# string + call sprintf + jmp 1f +0: .ascii "%u\0" +# get IP addrinfo block chain +1: lea aiP(bp), a # -->-->IP addrinfo + push a + pushl aiP(bp) # -->IP addrinfo hints + lea str(bp), a # -->remote port # string + push a + pushl $0f + call getaddrinfo + jmp 1f +0: .ascii "localhost\0" +# check IP addr +1: cmp $0, a # getaddrinfo OK ? + jz 0f # yes + push a + call gai_strerror + push a + LOG 0, "getaddrinfo error: %s, ABEND", 1 + push $1 + call exit # ABEND +# log assigned IP addr +0: pushl deP(bp) # -->Debug inst + mov aiP(bp), a # -->IP addrinfo + pushl ai_addrP(a) # -->sockaddr + lea str(bp), a + push a # -->string buf + call gai + LOG 5, "getaddrinfo OK, %s", 1 +# allocate comm socket + mov aiP(bp), a # -->IP addrinfo + pushl ai_protocol(a) + pushl ai_socktype(a) + pushl ai_family(a) +# call socket # allocate comm socket +# cmp $0, a +# jg 0f +# SYSERR "socket alloc" +#0: + SYS socket + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sc(c) # put new socket # into SocketInfo +# connect to remote partner server side + mov C.csP, a + mov C.connTh(a), a # conn retry threshold + mov a, retry(bp) # initilize retry counter + mov aiP(bp), a # -->IP addrinfo + pushl ai_addrlen(a) + pushl ai_addrP(a) + pushl So.sc(c) # socket # +2: call connect + cmp $0, a + jnl 0f # connected + call __errno_location # get --> sys errno + cmp $111, (a) # ECONNREFUSED ? + je 1f # yes, wait to retry + SYSERR "connect" # no, ABEND +1: pushl $22000 # 22 msecs + call usleep + lea 4(sp), sp + sub $1, retry(bp) + jnz 2b # iterate +# conn retry threshold reached + mov C.csP, a + pushl C.connTh(a) + LOG 0, "connection refused, threshold %d reached", 1 + pushl $1 + call exit # ABEND +0: +# connected, account new connection + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + lea S.counter_sem(a), a # -->overall counter semaphore + push a + SYS sem_wait + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + incl S.conns(a) # incr conns counter + lea S.counter_sem(a), a # -->overall msg counter semaphore + push a + SYS sem_post +# prepare SSL + cmp $1, N.ssl(b) # node's ssl switch + jne N.connRet # no SSL + + LOG 5, "prepare for SSL" + pushl N.ctxP(b) # -->SSL context + call SSL_new # create a SSL structure + + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sslP(c) # put -->SSL struct into SocketInfo + test a, a + jnz 0f + SSLERR "new SSL" +# connect on SSL +0: call ERR_clear_error + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket + pushl So.sslP(c) # -->SSL struct + call SSL_set_fd # connect the SSL object with a file descriptor + test a, a + jnz 0f + SSLERR "client SSL set fd" +0: + LOG 5, "SSL fd set" + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL struct + call SSL_connect + test a, a # connected ? + jns 1f # yes + push a + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL struct + call SSL_get_error # SSL_get_error(-->ssl, err) + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 0f # no, other SSL err + SYSERR "SSL connect" +0: SSLERR "SSL connect" +1: + mov C.csP, a + mov C.connTh(a), a + sub retry(bp), a # # of retries = conn threshold - counter + push a + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket# + LOG 2, "connected via sc=%d after %d retries", 2 +N.connRet: + EPILOG +#----------------------------------------------- +# A C C E P T C O N N E C T I O N +#----------------------------------------------- + ARGS + DS i # socket rank +# returns +# nothing +# local vars + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL sa, SockAddrL # sockaddr + DL str, 64 # string buf + DL currSockP # -->curr SocketInfo save area + EPILOC +#----------------------------------------------- + .global N.acc +N.acc: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 4, "accepting..." + + mov N.srvSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull i(bp) # curr SocketInfo offset + lea (c, a), a # -->curr SocketInfo + mov a, currSockP(bp) # save -->curr SocketInfo +# accept + pushl $0 + pushl $0 +# mov thisP, b # -->Node inst + pushl N.ssc(b) # listen socket +# call accept +# cmp $0, a +# jnl 0f +# SYSERR "accept" +#0: + SYS accept + mov currSockP(bp), c # -->Node inst + mov a, So.sc(c) # save comm socket of accepted connection + pushl $0f # -->sa length + lea sa(bp), a # -->sockaddr + push a + pushl So.sc(c) # comm socket + call getpeername + jmp 1f +0: .int SockAddrL +1: pushl deP(bp) + lea sa(bp), a # -->sockaddr + push a + lea str(bp), a # -->string buf + push a + call gai # get string of addr/port + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # comm socket + LOG 3, "peer: sc=%d, %s", 2 +# prepare SSL + mov thisP(bp), b # -->this Node + cmp $1, N.ssl(b) # node's ssl switch + jne 1f # no SSL + + pushl N.ctxP(b) # -->SSL context + call SSL_new # create a SSL structure + mov currSockP(bp), c # -->curr SocketInfo + mov a, So.sslP(c) # put -->SSL struct into SocketInfo + test a, a + jnz 0f + SSLERR "new SSL" +# accept on SSL +0: call ERR_clear_error + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket # + pushl So.sslP(c) # -->SSL struct + call SSL_set_fd # connect the SSL object with a file descriptor + test a, a + jnz 0f + SSLERR "client SSL set fd" +0: + LOG 5, "SSL fd set" + call SSL_accept + test a, a + jns 1f + mov a, 4(sp) + call SSL_get_error # SSL_get_error(-->ssl, err) + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 0f # no, other SSL err + SYSERR "SSL accept" +0: SSLERR "SSL accept" +1: + LOG 2, "accepted" + EPILOG +#----------------------------------------------- +# R E A D F R O M C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns length read + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL shP # -->shared mem + DL currSockP # -->curr SocketInfo + DL n # len read + DL rest # len to be read + DL buf # read dest + EPILOC +#----------------------------------------------- + .global N.read +N.read: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + + mov C.csP, a # -->CS + mov C.shP(a), a # -->shared mem + mov a, shP(bp) # save -->shared mem + push N.dataP(b) + call Da.getDataLen + mov a, rest(bp) # init len to be read + call Da.getContP + mov a, buf(bp) # init read dest + mov N.srvSideP(b), c # -->server side SocketInfo array + mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + + push So.sc(c) # comm socket + push rest(bp) # len to be read + LOG 5, "ready to read len=%d from sc=%d..." +# iterate on reading until whole length is read +N.readIterate: + cmp $0, rest(bp) # something to be read ? + jng N.readAcc # no, end reading + push rest(bp) # len to be read + push buf(bp) # read dest + mov currSockP(bp), c # curr SocketInfo + cmp $1, N.ssl(b) # SSL ? + jne 0f # no +# read w/ SSL + push So.sslP(c) # -->SSL struct + SYS SSL_read + jmp 1f +# read w/o SSL +0: + push So.sc(c) # socket # + SYS read +# adjust read controls +1: + jz N.readEof # EOF read + add a, buf(bp) # adjust -->read dest + sub a, rest(bp) # decrement rest to read + cmp $0, rest(bp) # everything read ? + jng 0f + push a + LOG 5, "partly read %d bytes" +0: jmp N.readIterate +# account data just read +N.readAcc: + mov shP(bp), a # -->shared mem + lea S.counter_sem(a), a # overall msg counter semaphore + push a + SYS sem_wait # get semaphore + mov shP(bp), a + incl S.msgs(a) # incr msgs counter + lea S.counter_sem(a), a # overall msg counter semaphore + push a + SYS sem_post # post semaphore + + lea -4(sp), sp # adjust stacker + push N.dataP(b) # -->Data + call Da.getPort + mov a, 4(sp) # remote listen port + call Da.getDataLen + mov a, (sp) # data length + LOG 5, "%d read from node %u" + mov (sp), a # return len read + jmp N.readRet +# EOF read, finish +N.readEof: + LOG 4, "EOF read" +N.readRet: + EPILOG_R +#----------------------------------------------- +# W R I T E T O C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns length written + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL currSockP # -->curr SocketInfo + DL n # len written + DL rest # len to be written + DL buf # write orig + EPILOC +#----------------------------------------------- + .global N.write +N.write: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug +# init control values + mov C.csP, a # -->CS + mov C.shP(a), a # -->shared mem + mov a, shP(bp) # save -->shared mem + push N.dataP(b) # -->Data + call Da.getDataLen + mov a, rest(bp) # init len to be written + call Da.getContP + mov a, buf(bp) # init write orig + mov N.cliSideP(b), c # -->SocketInfo array + mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c # -->curr SocketInfo + mov c, currSockP(bp) # save -->curr SocketInfo + + push So.sc(c) # comm socket + push rest(bp) # len to be written + LOG 5, "ready to write len=%d to sc=%d..." +# iterate on writing until whole length is written +N.writeIterate: + cmp $0, rest(bp) # something to be written ? + jng N.writeEnd # no, end writing + push rest(bp) # len to be written + push buf(bp) # write orig + cmp $1, N.ssl(b) # SSL ? + jne 0f # no +# SSL + pushl So.sslP(c) # -->SSL struct + SYS SSL_write + jmp 1f +# no SSL +0: + pushl So.sc(c) # socket + SYS write +# adjust write controls +1: + mov a, n(bp) # save len written + add a, buf(bp) # adjust write orig + sub a, rest(bp) # decrement rest to write + cmp $0, rest(bp) # everything written ? + jng 0f + push a + LOG 5, "partly written %d bytes" +0: jmp N.writeIterate +# return datalen +N.writeEnd: + mov N.dataP(b), a + mov Da.datalen(a), a + push a + LOG 5, "%d written", 1 + EPILOG_R +#----------------------------------------------- +# G E T D A T A F R O M C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns boolean: 0 - EOF read, 1 - data read + PROLOC + DL thisP # -->this Node + DL deP + EPILOC +#----------------------------------------------- + .global N.get +N.get: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + push sci(bp) # SocketInfo rank + LOG 5, "get data from socket, rank=%u" + call N.read + test a, a + jz 0f + mov $1, a +0: EPILOG_R +#----------------------------------------------- +# P U T D A T A T O C O M M S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank +# returns boolean: 0 - nothing written, 1 - data written + PROLOC + DL thisP # -->this Node + DL deP + EPILOC +#----------------------------------------------- + .global N.put +N.put: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + LOG 5, "put data to socket" + + mov N.dataP(b), d # -->Data + mov Da.contP(d), a # -->container + lea Co.hdr(a), a # -->header + mov N.locPort(b), c + mov c, H.lport(a) # listen port to header + + pushl sci(bp) # SocketInfo rank + call N.write + test a, a + jz 0f + mov $1, a +0: EPILOG_R +#----------------------------------------------- +# C L O S E C O N N E C T I O N S O C K E T +#----------------------------------------------- + ARGS + DS sci # SocketInfo rank + DS server # indicate server or client + DS deP # -->Debug inst +# returns:# nothing + PROLOC + DL thisP # -->this Node + DL currSockP # -->curr SocketInfo + DL tagP + DL e + EPILOC +N.srvSideTag: .asciz "server side" +N.cliSideTag: .asciz "client side" +#----------------------------------------------- + .global N.closeSocket +N.closeSocket: + PROLOG + mov b, thisP(bp) # save -->this Node + mov N.srvSideP(b), c # -->SocketInfo array + movl $N.srvSideTag, tagP(bp) + cmp $1, server(bp) # server side ? + je 0f + mov N.cliSideP(b), c # client side + movl $N.cliSideTag, tagP(bp) +0: mov $SocketInfoL, a + mull sci(bp) # curr SocketInfo offset + lea (c, a), c + mov c, currSockP(bp) # save -->curr SocketInfo + push So.sc(c) # socket to close + push tagP(bp) + LOG 5, "closing %s sc=%u...", 2 + + cmp $0, N.ssl(b) # SSL ? + je 2f # no +# SSL shutdown + pushl So.sslP(c) # -->SSL structure + call SSL_shutdown + cmp $0, a + jns 0f # no err + SSLERR "SSL shutdown (1)" +0: jnz 2f # SSL shutdown finished + LOG 5, "SSL shutdown rc=0, retry" + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL structure + call SSL_shutdown + cmp $0, a + jg 2f # SSL shutdown finished + push a # SSL err + mov currSockP(bp), c # -->curr SocketInfo + pushl So.sslP(c) # -->SSL structure + call SSL_get_error + cmp $5, a # SSL_ERROR_SYSCALL ? + jne 1f # no, other SSL err +# syscall err + call ERR_get_error + test a, a # SSL err ? + jnz 2f # yes, don't care + call __errno_location # sys err ? + test a, a + jz 2f # no + SYSERR "SSL shutdown (2)" +# other SSL err, print SSL err queue +1: SSLERR "SSL shutdown (2)" +# close socket +2: mov currSockP(bp), c # -->curr SocketInfo + pushl So.sc(c) # socket to close +# call close +# test a, a +# jz 0f +# SYSERR "close" +#0: + SYS close + push tagP(bp) + LOG 4, "%s sc=%u closed", 2 + mov currSockP(bp), c # -->curr SocketInfo + movl $-1, So.sc(c) # indicate closed socket + EPILOG +#----------------------------------------------- +# C R E A T E T H R E A D T O C L O S E C L I E N T S I D E S +#----------------------------------------------- + ARGS + PROLOC + DL thisP # -->this Node + DL deP # -->Debug inst + DL pt # pthread_t (int) + EPILOC +#----------------------------------------------- +N.closeClients: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save local -->Debug + cmp $1, N.closing(b) + je 0f # already closing, nothing to do + LOG 5, "spawning thread to close client side" + push b # -->this (param for thread) + pushl $N.closeCliThread # -->thread func + pushl $0 + lea N.ptid(b), a # -->thread ID + push a + call pthread_create + test a, a + jz 1f + SYSERR "create thread" +1: mov thisP(bp), b # -->this Node + movl $1, N.closing(b) # indicate "closing" + push N.ptid(b) + LOG 5, "closing thread (%u) spawned" +0: EPILOG +#----------------------------------------------- +# C L O S E C L I E N T S I D E S +#----------------------------------------------- + ARGS + DS thisP + PROLOC + DL deP # -->Debug inst + DL debug, DebugL + EPILOC +#----------------------------------------------- +N.closeCliThread: + PROLOG + mov thisP(bp), b # -->this Node + lea debug(bp), a + mov a, deP(bp) # save deP + lea N.debug(b), a + lea D.id(a), a # -->caller debug id + push a + DEBID "%s CLOSE clients", 1 + LOG 5, "start..." + mov $0, c # offet in socket info array + mov $0, d # socket info rank + mov N.cliSideP(b), a # -->client socket info array +0: # iterate on opened sockets + cmp $-1, So.sc(a, c) # sc allocated ? + je 1f # no, iterate + push So.sc(a, c) # socket# + LOG 4, "sc=%u" + push deP(bp) + pushl $0 # indicate "client" + push d # socket info rank + call N.closeSocket # close socket +1: inc d + cmp N.nodes(b), d # rank < nodes ? + je 0f # no, end + lea SocketInfoL(c), c # incr offset + jmp 0b # iterate +0: LOG 4, "finished, exiting thread" + pushl $0 + call pthread_exit + EPILOG +#----------------------------------------------- +# S Y N C H R O N I Z E A L L N O D E S +#----------------------------------------------- + ARGS + PROLOC + DL thisP + DL deP + DL shP + DL tosignal + DL sigmask, 128 + DL sigact, SigActionL + EPILOC +N.sync: + PROLOG + mov b, thisP(bp) # save -->this Node + lea N.debug(b), a + mov a, deP(bp) # save -->Debug locally for LOG macro + + movl $0, tosignal(bp) + mov C.csP, c # -->CS + mov C.shP(c), a # -->shared mem + push S.act(a) + mov a, shP(bp) + push N.pid(b) + LOG 5, "%u synchronizing: active=%u" +# set signal handler block + lea sigact(bp), a + movl $N.sighandle, sa_handler(a) + movl $0, sa_flags(a) + lea sa_mask(a), a # set handler mask + push a + SYS sigfillset + jmp 0f +N.sighandle: + ret +0: +# activate signal handler + lea sigmask(bp), a + push a + SYS sigemptyset + push $SIGUSR2 + lea sigmask(bp), a + push a + SYS sigaddset + push $0 + lea sigmask(bp), a + push a + push $SIG_UNBLOCK + SYS sigprocmask + push $0 + lea sigact(bp), a + push a + push $SIGUSR2 + SYS sigaction +# update nodes counter + mov shP(bp), a + lea S.counter_sem(a), a + push a + SYS sem_wait # get semaphore + mov shP(bp), a + decl S.act(a) # decrement nodes counter + setz tosignal(bp) + mov shP(bp), a + lea S.counter_sem(a), a + push a + SYS sem_post # post semaphore + testl $1, tosignal(bp) # all finished? + jz N.sigwait +# signal all tasks + push N.pid(b) + LOG 5, "%u signaling USR2" + push $SIGUSR2 + push $0 + SYS kill + jmp N.woken +# wait for signal +N.sigwait: +# call pause + push $5 + call sleep +N.woken: + mov thisP(bp), b # -->this Node + push N.pid(b) + LOG 5, "%u woken up" + EPILOG +#----------------------------------------------- +# F O R M A T A N D P R I N T S S L E R R O R +#----------------------------------------------- +# args + ac = 8 + DS deP # -->Debug + DS msgP # -->accompanying msg str +# returns: nothing + PROLOC + DL str, 256 # generated string buf + EPILOC +#----------------------------------------------- + .global N.sslErr +N.sslErr: + enter $locL, $0 +0: + call ERR_get_error + push a + LOG 5, "ssl err, e=%lu", 1 + test a, a # end of msg queue ? + jz 0f # yes + lea str(bp), d + push d # -->err msg str buf + push a # err + call ERR_error_string + push a # -->err msg str + pushl msgP(bp) # -->accompanying msg str + LOG 0, "%s: %s", 2 + jmp 0b # iterate on SSL err msg queue +0: + pushl $1 + call exit # ABEND +#----------------------------------------------- +# G E T A S S I G N E D I P A D D R +#----------------------------------------------- +# puts assigned IP address:port in string + ARGS + DS sP # -->output string + DS saP # -->sockaddr + DS deP # -->Debug +# returns: nothing + PROLOC + EPILOC +#----------------------------------------------- +gai: PROLOG + + mov saP(bp), b # -->sockaddr + xor a, a + mov sa_data(b), %ax # port # in big endian order + push a + call ntohs + mov a, d + + mov saP(bp), b # -->sockaddr + pushl sa_data+2(b) # IP addr in big endian order + call ntohl + + push d # port # as int + + mov $4, c # iterate on 4 bytes of IP addr +0: mov a, d + and $0xff, d + push d # part of IP addr + shr $8, a + dec c + jnz 0b # iterate + + push $0f + pushl sP(bp) # -->output buf + call sprintf + jmp 1f +0: .ascii "%u.%u.%u.%u:%d\0" +1: + EPILOG +#----------------------------------------------- + .end