diff -r 000000000000 -r 5c129dd80d4f CSj/CS.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/CSj/CS.java Thu Nov 21 14:55:10 2019 +0100 @@ -0,0 +1,1135 @@ +import java.util.*; +import java.io.*; +import java.net.*; +import javax.net.ssl.*; +import java.security.*; +import java.nio.*; +import java.nio.channels.*; +import java.awt.*; +import java.awt.image.*; +import java.awt.event.*; +import javax.swing.*; +import javax.swing.border.*; + +class Debug { + static int debug; + String debid = ""; + Debug(Debug d) { debid = d.debid; } + Debug(String debid) { this.debid = debid; } + Debug() {} + static void pRe(Object o) { + System.err.println((new Date().getTime()) + " " + o); + System.err.flush(); + } + void log(int level, Object o) { + if(debug == 7) { if(level == 7) pRe(debid + ": " + o); } + else if(debug >= level) pRe(debid + ": " + o); + } + boolean abendMsg(String msg, Exception x) { + log(0, "ABEND: " + msg + (x == null ? "" : (": " + x.getClass().getSimpleName() + ": " + x.getMessage()))); + return false; + } + String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit(); } +} +class Data extends Debug implements Serializable { + static final long serialVersionUID = 42; + class DataObj implements Serializable { + static final long serialVersionUID = 42; + String text; + int ttl; + int lport; + int rport; + DataObj(String s, int ttl) { + this.text = s; + this.ttl = ttl; + this.lport = 0; + this.rport = 0; + } + } + ByteBuffer buf; + DataObj data; + Data(Debug deb, String s, int ttl) throws Exception { + super(deb.debid + " DATA"); + data = new DataObj(s, ttl); + if(CS.fake != 0 && debid.equals("DEMO SSL MASH DATA")) throw new Exception("faked error"); // <----------------------------------------------- + load(); + } + Data(Debug deb, ByteBuffer b) throws Exception { + super(deb); + debid += " DATA"; + log(5, "unloading data objects from buffer..."); + unload(); + } + Data(Debug deb) { + super(deb); + debid += " DATA"; + } + void load() throws IOException { + log(5, "loading data objects to buffer..."); + ByteArrayOutputStream bas = new ByteArrayOutputStream(); + try { + new ObjectOutputStream(bas).writeObject(data); + buf = ByteBuffer.wrap(bas.toByteArray()); + } catch(IOException x) { abendMsg("data buffer " + data + " load error", x); throw new IOException(x); } + } + void unload() throws Exception { + buf.rewind(); + data = (DataObj) new ObjectInputStream(new ByteArrayInputStream(buf.array())).readObject(); + } + int dttl() { + return --data.ttl; + } + int ttl() { + return data.ttl; + } + boolean equals(Data data) { + return this.data.text.equals(data.data.text); + } + public String toString() { + return "data: ttl=" + data.ttl + ", text=" + + (data.text.length() < 24 ? data.text : data.text.substring(0, 8) + "-------" + data.text.substring(data.text.length() - 8)); + } +} +class Node extends Debug implements Runnable { + boolean kicker, closing = false; + int thisNode, nextNode, closingThreshHold = 0; + Data data; + Selector selector; + ServerSocketChannel ssc; + SelectionKey ssk; + SSLContext sslC = null; + HashMap cliSide = new HashMap(); + HashMap srvSide = new HashMap(); + Constellation con; + Node(Constellation con, int port) { + super(con); + debid = (con.ssl ? "" : "non") + "SSL " + (con.mash ? "mash" : "ring") + " node " + port; + log(4, "initalizing..."); + try { + this.con = con; + thisNode = port; + kicker = (thisNode == con.first); + if(con.ssl) try { prepareSSL(); } catch(Exception x) { abend("SSL preparation", x); throw new Exception(); } + try { bind(); } catch(Exception x) { abend("bind", x); throw new Exception(); } + data = new Data(this); + data.buf = ByteBuffer.allocate(con.data.buf.limit()); + log(5, "initalized"); + } catch(Exception x) { abend("initialization", x); } + } + public void run() { + runLoop(); + log(2, "ended"); + synchronized(con.glob) { con.glob.spawned--; con.glob.notify(); } + } + public void runLoop() { + if(con.glob.doForward && kicker) { // kick off + log(1, "ready to initial send (" + con.data.buf.limit() + " bytes) " + con.data.toString()); + data.buf.put(con.data.buf); data.buf.flip(); + try { data.unload(); data.load(); } catch(Exception x) {}; + doForward(); + } + //if(con.glob.doForward && !con.glob.abend) do { // main loop + if(con.glob.doForward) do { // main loop + int k = 0; + log(5, "main loop, waiting for " + (con.glob.doForward ? "data" : "close") + + " from " + srvSide.size() + " open sockets, timeout=" + CS.selTO + " msecs, closingThreshHold=" + closingThreshHold); + if(!con.glob.doForward && closingThreshHold == 0) closingThreshHold = 5000 / CS.selTO; + try { + try { while((k = selector.select(CS.selTO)) == 0 && con.glob.doForward) {} + } catch(Exception x) { abend("socket channel select", x); throw new Exception(x); } + if(k > 0) { + for(Iterator ski = selector.selectedKeys().iterator(); ski.hasNext();) { + SelectionKey sk = ski.next(); + if(sk.isAcceptable()) + try { acc(); } catch(Exception x) { abend("main loop accept error", x); throw new Exception(x); } + if(sk.isReadable()) + try { forward(sk); } catch(Exception x) { abend("main loop forward error", x); throw new Exception(x); } + ski.remove(); + } + } + } catch (Exception x) { abend("main loop", x); } + if(con.glob.stop) { + log(1, "constellation stop, closing all connections"); + stop(); return; + } + } while(con.glob.doForward || ((srvSide.size() > 0) && (closingThreshHold-- > 0))); + //} while((con.glob.doForward || (srvSide.size() > 0)) && !con.glob.abend); + log(5, "closing ssc..."); + try { ssc.close(); } catch (Exception x) { abend("ssc close", x); } + if(kicker && !con.glob.stop && !con.glob.abend) con.glob.abend = !data.equals(con.data); + } + private void forward(SelectionKey sk) { + log(5, "entering 'forward'..."); + SockIO si = srvSide.get(sk); + if(!si.get()) { + stop(); + si.close(); + srvSide.remove(sk); + } + else { + log(5, "received " + prBuf(data.buf)); + try { data.unload(); } catch(Exception x) { abend("data unload at forwarding", x); return; } + if(kicker) { + log(3, "received from " + (con.mash ? "mash: " : "ring: ") + data.toString()); + if(data.dttl() <= 0) { + if(CS.isGui) con.cBox.ttl = data.ttl(); + log(1, "TTL 0 reached, received " + data.toString() + ", leaving 'forward' closing all connections"); + stop(); return; + } + try { data.load(); } catch(Exception x) { abend("data load at forwarding", x); return; } + } + if(con.glob.doForward) doForward(); + } + log(4, "leaving 'forward'"); + } + private void doForward() { + if((nextNode = chooseNextNode()) == 0) { stop(); return; } + log(3, "forwarding data to " + nextNode); + if(CS.pacing) pacing(nextNode); + if(CS.pace > 0) try { Thread.sleep(CS.pace); } catch(InterruptedException i) {}; + if(!cliSide.get(nextNode).put()) stop(); + } + private int chooseNextNode() { + int next; + if(con.mash) while((next = (con.first + CS.r.nextInt(con.nodes))) == thisNode); + else { next = thisNode + 1; if(next > con.last) next = con.first; } + if(cliSide.containsKey(next)) return next; + else return (conn(next) ? next : 0); + } + private void pacing(int nextNode) { + log(4, "constls.pacing"); + synchronized(CS.constls) { + if(!con.glob.pacingGo) try { CS.constls.wait(); } catch(InterruptedException x) {} + if(nextNode > 0) { + con.cBox.currLink[0] = thisNode - con.first; + con.cBox.currLink[1] = nextNode - con.first; + } + con.cBox.ttl = data.ttl(); + log(4, "constls.notify"); CS.constls.notify(); + con.glob.pacingGo = false; + } + } + public void stop() { + con.glob.doForward = false; + synchronized(con) { con.notify(); } + closeNode(); + } + private void abend(String msg, Exception x) { + abendMsg(msg, x); + con.glob.abend = true; + CS.isRun = false; + stop(); + } + private void closeNode() { + if(!closing) new Thread(new closeClients(this)).start(); + closing = true; + } + private class closeClients extends Debug implements Runnable { + closeClients(Debug deb) { this.debid = deb.debid + " clients CLOSE"; } + public void run() { + log(4, "starting..."); + for(Iterator i = cliSide.keySet().iterator(); i.hasNext();) { + int port = i.next(); + log(4, "closing conn to " + port); + cliSide.get(port).close(); + } + log(4, "end"); + } + } + private void prepareSSL() throws Exception { + char[] passphrase = "passphrase".toCharArray(); + KeyStore ks = KeyStore.getInstance("JKS"); + String ksFile = (System.getenv("KSF") == null) ? CS.cePath + "/testkeys" : System.getenv("KSF"); + FileInputStream kfs = new FileInputStream(ksFile); + ks.load(kfs, passphrase); + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, passphrase); + KeyStore ts = KeyStore.getInstance("JKS"); + FileInputStream tfs = new FileInputStream(ksFile); + ts.load(tfs, passphrase); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(ts); + sslC = SSLContext.getInstance("TLS"); + sslC.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + kfs.close(); + tfs.close(); + } + private void bind() throws IOException { + log(5, "binding..."); + selector = Selector.open(); + ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); + ssc.socket().bind(new InetSocketAddress(thisNode), 1024); + ssk = ssc.register(selector, SelectionKey.OP_ACCEPT); + log(2, "bound to " + thisNode); + } + private boolean conn(int remPort) { + int retry = CS.connThreshold; + boolean connected = false; + SocketChannel sc = null; + log(4, "connecting to " + remPort + ", timeout=" + (CS.connThreshold*CS.connTO)/1000 + " secs. ..."); + while(!connected && retry-- > 0 && con.glob.doForward) { + try { + sc = SocketChannel.open(new InetSocketAddress("localhost", remPort)); + connected = true; + } catch(Exception x) { + if(x.getMessage().equals("Connection refused")) { + try { Thread.sleep(CS.connTO); } catch(InterruptedException i) {} + } else { abendMsg("connection to " + remPort, x); return false; } + } + } + if(!con.glob.doForward) return false; + if(connected) { + try { cliSide.put(remPort, new SockIO(this, sc, false)); + } catch(Exception x) { abendMsg("connection to " + remPort, x); return false; } + log(2, "connected after " + (CS.connThreshold - retry + 1) + " retries"); + return true; + } + else { abendMsg("connection to " + remPort + " timeout", null); return false; } + } + private void acc() throws Exception { + try { + SocketChannel sc = ssc.accept(); + sc.configureBlocking(false); + SockIO si = new SockIO(this, sc, true); + SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); + srvSide.put(sk, si); + CS.connCnt++; + log(2, "connection accepted"); + } catch(Exception x) { abend("connection accept", x); throw new Exception(x); } + } + private class SockIO extends Debug { + SocketChannel sc; + Selector handshakeSelector; + SelectionKey sk; + Runnable ru; + SSLSession session; + SSLEngine e; + SSLEngineResult r; + ByteBuffer ci, co, ib; + boolean wrapper; + SockIO(Debug deb, SocketChannel sc, boolean server) throws Exception { + this.debid = deb.debid + " socket " + (server ? "(server)" : "(client)"); + log(5, "initializing..."); + this.sc = sc; + if(con.ssl) { + handshakeSelector = Selector.open(); + sc.configureBlocking(false); + sk = sc.register(handshakeSelector, SelectionKey.OP_READ); + e = sslC.createSSLEngine(); + session = e.getSession(); + int am = session.getApplicationBufferSize(); + int pm = session.getPacketBufferSize(); + ib = ByteBuffer.allocate(am); // pišvejcova konstanta + co = ByteBuffer.allocateDirect(pm); + ci = ByteBuffer.allocateDirect(pm); + if(server) { + e.setUseClientMode(false); + e.setNeedClientAuth(true); + } else e.setUseClientMode(true); + } + log(4, "initialized"); + } + String prBuf(ByteBuffer b) { return b.position() + "/" + b.remaining() + "/" + b.limit() + "/" + b.capacity(); } + private String eStat() { + String stat; + if (r != null) + stat = r.getStatus() + "/" + r.getHandshakeStatus() + "/" + e.getHandshakeStatus() + + ", bytes: " + r.bytesConsumed() + "/" + r.bytesProduced(); + else stat = "-/-/" + e.getHandshakeStatus() + " -/-"; + return stat; + } + //-- result status + private boolean isOK() { + return r.getStatus() == SSLEngineResult.Status.OK; } + private boolean isClosed() { + return r.getStatus() == SSLEngineResult.Status.CLOSED; } + private boolean isBad() { + return r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW; } + //-- result handshake status + private boolean handShake() { + return r.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; } + private boolean handShakeEnd() { + return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED; } + private boolean needTask() { + return r.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK; } + //-- engine handshake status + private boolean needUnwrap() { + return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP; } + private boolean needWrap() { + return e.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_WRAP; } + private int read(SocketChannel sc, ByteBuffer b) { + int n = -1, k = 0; + log(5, "read, " + prBuf(b) + " ..."); + if(con.ssl) { + try { while((k = handshakeSelector.select(CS.selTO)) == 0) { if(!con.glob.doForward) break; } + } catch (Exception x) { abendMsg("handshake select", x); k = 0; } + if(k>0 && sk.isReadable()) { + try { n = sc.read(b); } catch(Exception x) { abendMsg("socket channel read", x); n = -1; } + handshakeSelector.selectedKeys().remove(sk); + } + } else + try { while(b.hasRemaining()) { n += sc.read(b); if(n < 0) break; }; n++; + } catch(Exception x) { abendMsg("socket channel read", x); n = -1; }; + log(4, "read " + n); + return n; + } + private int write(SocketChannel sc, ByteBuffer b) { + log(5, "writing " + prBuf(b) + " ..."); + int n = 0; + try { n = sc.write(b); } catch(Exception x) { abendMsg("socket channel write", x); n = -1; } + log(4, "written " + n); + return n; + } + private boolean replenish() { + log(5, "replenishing ci..."); + ci.clear(); + int n = read(sc, ci); + ci.flip(); + return (n >= 0); + + } + void handleUnWrapStatus() { + ByteBuffer b; + if(r.getStatus() == SSLEngineResult.Status.BUFFER_OVERFLOW) { + log(5, "unwrap: ib BUFFER_OVERFLOW " + prBuf(ib)); + if(ib.position() > 0) { ib.flip(); data.buf.put(ib); ib.clear(); } + else { + b = ByteBuffer.allocate((int)(1.25 * ib.capacity())); + ib.flip(); b.put(ib); ib = b; } + log(5, "ib " + prBuf(ib)); + } + if(r.getStatus() == SSLEngineResult.Status.BUFFER_UNDERFLOW) { + int n; + log(5, "unwrap: ci BUFFER_UNDERFLOW " + prBuf(ci)); + if(ci.limit() < ci.capacity()) { + ci.mark(); ci.position(ci.limit()); ci.limit(ci.capacity()); + n = read(sc, ci); ci.limit(ci.position()); ci.reset(); } + else { + n = ci.capacity(); if(ci.position() == 0) n *= 2; + b = ByteBuffer.allocate(n); b.put(ci); ci = b; + n = read(sc, ci); ci.flip(); } + log(5, "additional " + n + " bytes read: " + prBuf(ci)); } + } + boolean doWrap() { + log(5, "entering " + "wrap, ob: " + prBuf(data.buf) + ", co: " + prBuf(co) + " ..."); + try { r = e.wrap(data.buf, co); } catch (Exception x) { abend("SSL engine wrap", x); return false; } + //if(isBad()) { con.glob.abend = true; return abendMsg("SSL engine status: " + r.getStatus().toString(), null); } + if(isBad()) { abend("SSL engine status: " + r.getStatus().toString(), null); return false; } + log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " wrap: " + eStat() + ", ob: " + prBuf(data.buf) + ", co: " + prBuf(co)); + if(isClosed()) return false; + return true; + } + private boolean wrap() { + do {co.clear(); + if(!doWrap()) return false; + if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run(); + co.flip(); + if(handShake() && (write(sc, co) == -1)) return false; + } while (needWrap()); + if(needUnwrap() && !( replenish() && unwrap() )) return false; + log(5, (wrapper ? "wrapper" : "unwrapper") + " wrap HS finished=" + handShakeEnd()); + if(wrapper && handShakeEnd()) { + co.clear(); + if(!doWrap()) return false; + co.flip(); + } + return true; + } + boolean doUnwrap() { + log(5, "entering " + "unwrap, ib: " + prBuf(ib) + ", ci: " + prBuf(ci) + " ..."); + try { r = e.unwrap(ci, ib); } catch (Exception x) { abend("SSL engine unwrap", x); return false; } + log(4, "after " + (wrapper ? "wrapper" : "unwrapper") + " unwrap: " + eStat() + ", ib: " + prBuf(ib) + ", ci: " + prBuf(ci)); + if(isClosed()) return false; + return true; + } + private boolean unwrap() { + do { // while( needUnwrap() ) + do { // while( needUnwrap() && ci.hasRemaining() ) + if(!doUnwrap()) return false; + if(needTask()) while((ru = e.getDelegatedTask()) != null) ru.run(); + } while(needUnwrap() && ci.hasRemaining()); + if(needUnwrap() && !replenish()) return false; + } while(needUnwrap()); + if(needWrap() && !wrap()) return false; + if(!wrapper) { + if(handShakeEnd() && !replenish()) return false; + if(handShakeEnd() || !handShake()) { + handleUnWrapStatus(); + while(ci.hasRemaining()) { + do { + if(!doUnwrap()) return false; + handleUnWrapStatus(); + } while(!isOK()); + } + } + } + return true; + } + boolean get() { + wrapper = false; + boolean got = false; + data.buf.clear(); + if(con.ssl) { + while(data.buf.hasRemaining()) { + ib.clear(); + int n; + ci.clear(); n = read(sc, ci); ci.flip(); + if(n < 0) { got = false; break; } + else got = (unwrap() && !isClosed()); + if(got) { ib.flip(); data.buf.put(ib); log(4, "partially got " + prBuf(data.buf)); } + else break; + } + } + else try { got = (read(sc, data.buf) >= 0); } catch(Exception x) { got = abendMsg("socket read", x); } + return got; + } + boolean put() { + wrapper = true; + boolean put = false; + log(4, "put: ob: " + prBuf(data.buf)); + try { + do { + if(con.ssl) { + if(!wrap() || isClosed()) return false; + else put = (write(sc, co) >= 0); + } else put = (write(sc, data.buf) >= 0); + } while(put && data.buf.hasRemaining()); + if(put) CS.forwCnt++; + } catch(Exception x) { put = abendMsg("socket write", x); } + return put; + } + void close() { + log(4, "terminating connection..."); + try { + if(con.ssl) { + data.buf.put(ByteBuffer.wrap("".getBytes())); + e.closeOutbound(); + wrap(); + } + sc.close(); + } catch(Exception x) {} + } + } +} +class Constellation extends Debug implements Runnable { + class Glob { + boolean doForward = true; + boolean pacingGo = false; + boolean stop = false; + boolean abend = false; + int D = 0; + int spawned = 0; + int from = 0, to = 0; + } + volatile Glob glob; + boolean mash, ssl; + int first, last, nodes; + Data data; + CS cs; + Gui.CBox cBox; + String label; + Constellation(CS cs, Gui.CBox cBox, boolean mash, boolean ssl) { + label = (ssl ? "" : "non") + "SSL " + (mash ? "MASH" : "RING"); + debid = "DEMO " + label; + this.cs = cs; + this.cBox = cBox; + this.mash = mash; + this.ssl = ssl; + if(mash) { first = CS.mp0; nodes = CS.mn; } + else { first = CS.rp0; nodes = CS.rn; } + glob = new Glob(); + } + Constellation(Constellation con) { + super(con); + this.cs = con.cs; + this.mash = con.mash; + this.ssl = con.ssl; + this.first = con.first; + this.last = con.last; + this.nodes = con.nodes; + this.glob = con.glob; + } + void stop() { glob.stop = true; } + //void reset() { + //first = mash ? CS.mp0 : CS.rp0; + //first += nodes; + //glob.doForward = true; + //glob.abend = false; + //glob.spawned = 0; + //} + public void run() { + log(4, "starting"); + glob.pacingGo = true; + runNodes(); + synchronized(CS.constls) { CS.constls.notify(); } + synchronized(cs) { + if(--CS.notFinished == 0) cs.notifyAll(); + else try { cs.wait(); } catch(InterruptedException e) {}; + } + log(1, glob.abend ? "BAD: constellation not finished correctly" : "OK, constellation finished correctly"); + if(glob.abend) CS.abend = true; + CS.spawned--; + synchronized(cs) { cs.notify(); } + } + void runNodes() { + log(1, nodes + " nodes constellation, ttl=" + CS.pttl + " starting..."); + first += (ssl ? 500 : 0); + last = first + nodes - 1; + if(data == null) + try { data = new Data(this, CS.text, CS.pttl); + } catch(Exception x) { abendMsg("creating initial data", x); return; } + synchronized(glob) { + for(int port = first; (port < first + nodes); port++) { + try { + new Thread(new Node(this, port), "Node " + port).start(); + log(3, "node " + port + " established"); + glob.spawned++; + } catch(Exception x) { glob.doForward = false; glob.abend = true; break; } + } + if(glob.doForward) log(2, "all nodes established"); + while(glob.spawned > 0) try { glob.wait(); } catch(InterruptedException e) {}; + } + log(2, "all nodes finished"); + } +} +class Gui extends Debug implements Runnable { + class Parms extends JPanel { + static final long serialVersionUID = 43; + class Parm implements ActionListener { + JComboBox valueEntry; + JLabel valueLabel; + Parm(Number[] values, Number value, String label, boolean editable, boolean rowEnd) { + log(5, "parm " + label); + valueLabel = new JLabel(label); + valueLabel.setBorder(b); + if(orientation == HORIZONTAL) gridC.gridwidth = 1; + else gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(valueLabel, gridC); + add(valueLabel); + valueEntry = new JComboBox(values); + valueEntry.setPreferredSize(new Dimension(prefComboWidth, prefComboHeight)); + if(value != null) valueEntry.setSelectedItem(value); + valueEntry.setEditable(editable); + valueEntry.addActionListener(this); + if(orientation == HORIZONTAL && rowEnd) gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(valueEntry, gridC); + add(valueEntry); + } + public void actionPerformed(ActionEvent e) { + try { setVal((Number)((JComboBox)e.getSource()).getSelectedItem()); } catch(Exception x) { log(0, x.getMessage()); } + } + void getEnv() {} + void setVal(Number v) {} + } + class Buttons extends Box { + class GoButton extends JButton implements ActionListener { + static final long serialVersionUID = 44; + GoButton() { + super("go"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + CS.go = true; + } + public void actionPerformed(ActionEvent e) { + log(5, getText() + " button pressed"); + if(getText().equals("go")) { setText("pause"); CS.go = true; CS.pacing = true; CS.isRun = true; awake(); } + else { setText("go"); CS.go = false; } + } + } + class StepButton extends JButton implements ActionListener { + static final long serialVersionUID = 44; + StepButton() { + super("step"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { + log(5, getText() + " button pressed"); + CS.go = false; CS.pacing = true; CS.isRun = true; + setGo(); + awake(); + } + } + class ResetButton extends JButton implements ActionListener { + static final long serialVersionUID = 45; + ResetButton() { + super("reset"); + addActionListener(this); + gridC.gridwidth = 1; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { CS.isRun = false; CS.isReset = true; CS.go = true; awake(); } + } + class StopButton extends JButton implements ActionListener { + static final long serialVersionUID = 46; + StopButton() { + super("end"); + addActionListener(this); + gridC.gridwidth = 1; + //gridC.gridwidth = GridBagConstraints.REMAINDER; + gridL.setConstraints(this, gridC); + } + public void actionPerformed(ActionEvent e) { closeUI(); } + } + GoButton go; + ResetButton reset; + StepButton step; + StopButton stop; + Box row1, row2; + Buttons() { + super(BoxLayout.Y_AXIS); + setBorder(b); + add(row1 = new Box(BoxLayout.X_AXIS)); + add(row2 = new Box(BoxLayout.X_AXIS)); + row1.add(go = new GoButton()); + row1.add(step = new StepButton()); + row2.add(reset = new ResetButton()); + row2.add(stop = new StopButton()); + } + void setGo() { go.setText("go"); } + void setPause() { go.setText("pause"); } + void enableStep(boolean b) { step.setEnabled(b); } + } + static final boolean EDITABLE = true; + static final boolean ROW_END = true; + boolean orientation; + EmptyBorder b = new EmptyBorder(0,7,0,7); + GridBagLayout gridL = new GridBagLayout(); + GridBagConstraints gridC = new GridBagConstraints(); + int prefComboWidth, prefComboHeight; + Buttons buttons; + Parms(boolean orientation) { + textHeight = (int)Math.round(1.5 * getFontMetrics(getFont()).getHeight()); + prefComboWidth = (int)Math.round(1.5 * getFontMetrics(getFont()).bytesWidth("000000".getBytes(), 0, 6)); + prefComboHeight = textHeight; + this.orientation = orientation; + gridC.fill = GridBagConstraints.BOTH; + setFont(new Font("SansSerif", Font.PLAIN, 9)); + setLayout(gridL); + new Parm(new Integer[] {0,1,2,3,4,5,7,9}, new Integer(CS.debug), "Debug level", !EDITABLE, !ROW_END) { + void setVal(Number v) { CS.debug = v.intValue(); } }; + new Parm(new Integer[] {0,1,2}, new Integer(CS.issl), "SSL", !EDITABLE, !ROW_END) { + void setVal(Number v) { CS.issl = v.intValue(); } }; + new Parm(new Integer[] {CS.pttl}, null, "TTL", EDITABLE, ROW_END) { + void setVal(Number v) { CS.pttl = v.intValue(); } }; + new Parm(new Double[] {(double)CS.ipace/1000}, null, "pace in secs.", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.ipace = (int)(1000.0 * v.doubleValue()); CS.pace = CS.ipace; } }; + new Parm(new Integer[] {CS.mp0}, null, "listen port of first MASH node", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.mp0 = v.intValue(); } }; + new Parm(new Integer[] {CS.rp0}, null, "listen port of first RING node", EDITABLE, ROW_END) { + void setVal(Number v) { CS.rp0 = v.intValue(); } }; + new Parm(new Integer[] {CS.mn}, null, "MASH constellation size", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.mn = v.intValue(); } }; + new Parm(new Integer[] {CS.rn}, null, "RING constellation size", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.rn = v.intValue(); } }; + new Parm(new Double[] {(double)CS.selTO/1000}, null, "I/O selection timeout in secs.", EDITABLE, ROW_END) { + void setVal(Number v) { CS.selTO = 1000 * v.intValue(); } }; + new Parm(new Integer[] {CS.fake}, null, "point of faked exception (integer)", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.fake = v.intValue(); } }; + new Parm(new Integer[] {CS.rs}, null, "random seed (integer)", EDITABLE, !ROW_END) { + void setVal(Number v) { CS.rs = v.intValue(); } }; + //gridC.gridwidth = 0; + gridC.gridwidth = GridBagConstraints.REMAINDER; + add(buttons = new Buttons()); + } + } + class CBoxBg extends BufferedImage { + final int nodeC[][]; // node centers + CBoxBg(int n) { + super(cBoxSize , cBoxSize, BufferedImage.TYPE_INT_RGB); + nodeC = new int[n][2]; + log(5, "cnstlltn bg image beg, node centers array length=" + nodeC.length); + final double a0 = Math.PI / 2; + final double aN = 2 * Math.PI / n; + final int b = 3; // border + final int r = 5; // node diameter + int cx, cy; // constellation center coordinates + cx = cy = cBoxSize/2; + int R = cBoxSize/2 - r - 2*b; // distance of node centers from constellation center + int dx, dy; // deltas of node center coordinates + final Graphics2D g2 = (Graphics2D)this.getGraphics(); + g2.setBackground(Color.WHITE); + g2.clearRect(0, 0, cBoxSize, cBoxSize); + g2.setColor(Color.BLACK); + g2.draw3DRect(b, b, cBoxSize - 2*b, cBoxSize - 2*b, true); + if(n < 2) return; + for(int i=0; i 0) { + resultBox.add(sslBox = new CcBox("w/")); + sslMashBox = sslBox.mashBox; + sslRingBox = sslBox.ringBox; + } + if(CS.issl != 1) { + resultBox.add(nonSslBox = new CcBox("non")); + nonSslMashBox = nonSslBox.mashBox; + nonSslRingBox = nonSslBox.ringBox; + } + ui.pack(); + } + void awake() { synchronized(CS.gui) { CS.gui.notify(); } } + void dashboardReset() { + //parms.buttons.setGo(); + parms.buttons.enableStep(true); + } + void closeUI() { + log(5, "closing window"); + CS.isGui = false; CS.isRun = false; CS.go = true; awake(); + } +} +public class CS extends Debug { + volatile static int + connCnt = 0, + forwCnt = 0, + notFinished = 0, + spawned = 0; + volatile static boolean abend = false; + static String text = "bla bla"; + static String clsPath; + static String cePath; + static final String sslPathSuffP = "../CS"; + static Random r; + static int + mn = 0, + mp0 = 11000, + rn = 0, + rp0 = 12000, + pttl = 3, + issl = 0, + connThreshold = 77, // connection retries threshold + connTO = 99, // connection sleep time in msecs + selTO = 999, // selection timeout in msecs + ipace = 0, + pace = 0, + rs = 0, + fake = 0; + static boolean isGui = false, isRun = true, isReset = false, go = true, pacing = false; + static ArrayList constls; + public static Constellation nonSslMashCon, nonSslRingCon, sslMashCon, sslRingCon; + static Gui gui; + static Gui.CBox nonSslMashBox, nonSslRingBox, sslMashBox, sslRingBox; + CS() { + debid = "client server DEMO"; + getArgs(); + } + boolean isArg(String a) { return System.getenv(a) != null; } + int getArgI(String a) throws Exception { + int i = -1; + if(System.getenv(a) != null) + if(!System.getenv(a).equals("")) + try { i = Integer.valueOf(System.getenv(a)); + } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); } + return i; + } + double getArgF(String a) throws Exception { + double d = -1; + if(System.getenv(a) != null) + if(!System.getenv(a).equals("")) + try { d = Double.valueOf(System.getenv(a)); + } catch(NumberFormatException x) { throw new Exception(a + "=\terror in number format"); } + return d; + } + int getMArg(String a) throws Exception { + int i; + if((i = getArgI(a)) == 0) throw new Exception(a + " is mandatory"); + return i; + } + void getArgs() { + try { + clsPath = getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); + if(getArgI("DEB") >= 0) debug = getArgI("DEB"); + if(System.getenv("T") != null) text = System.getenv("T"); + if(getArgI("TTL") > 0) pttl = getArgI("TTL"); + if(getArgF("P") >= 0) ipace = (int)(getArgF("P") * 1000); + if(getArgI("MP0") > 0) mp0 = getArgI("MP0"); + if(getArgI("RP0") > 0) rp0 = getArgI("RP0"); + if(getArgI("N") >= 0) { mn = getArgI("N"); rn = mn; } + if(getArgI("SSL") >= 0) issl = getArgI("SSL"); + if(System.getenv("CEP") != null) cePath = System.getenv("CEP"); + else cePath = clsPath + sslPathSuffP; + if(getArgI("MN") >= 0) mn = getArgI("MN"); + if(getArgI("RN") >= 0) rn = getArgI("RN"); + if(getArgF("STO") >= 0) selTO = (int)(getArgF("STO") * 1000); + if(getArgI("FAKE") >= 0) fake = getArgI("FAKE"); + if(isArg("RS")) rs = getArgI("RS"); + if(isArg("G")) isGui = getArgI("G") == 1; + } catch(Exception x) { log(0, x.getMessage()); return; } + } + void dashboard() { + gui = new Gui(this); + log(4, "wait for args from GUI"); + synchronized(gui) { + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + try { gui.wait(); } catch(InterruptedException x) {} + } + if(isGui) cboxes(); + } + void cboxes() { + log(4, "cboxes"); + try { gui.cboxes(); } catch(Exception x) { log(0, x.getMessage()); } + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + nonSslMashBox = gui.nonSslMashBox; + nonSslRingBox = gui.nonSslRingBox; + sslMashBox = gui.sslMashBox; + sslRingBox = gui.sslRingBox; + + } + void constellations() { + pace = ipace; + spawned = 0; notFinished = 0; + constls = new ArrayList(); + if(mn == 1) log(0, "one-node MASH configuration not implemented"); + if(mn > 1) { + if(issl > 0) { sslMashCon = new Constellation(this, sslMashBox, true, true); constls.add(sslMashCon); } + if(issl != 1) { nonSslMashCon = new Constellation(this, nonSslMashBox, true, false); constls.add(nonSslMashCon); } + } + if(rn == 1) log(0, "one-node RING configuration not implemented"); + if(rn > 1) { + if(issl > 0) { sslRingCon = new Constellation(this, sslRingBox, false, true); constls.add(sslRingCon); } + if(issl != 1) { nonSslRingCon = new Constellation(this, nonSslRingBox, false, false); constls.add(nonSslRingCon); } + } + } + void stop() { + if(sslMashCon != null) sslMashCon.stop(); + if(sslRingCon != null) sslRingCon.stop(); + if(nonSslMashCon != null) nonSslMashCon.stop(); + if(nonSslRingCon != null) nonSslRingCon.stop(); + pacing = false; + pace = 0; + go = true; + } + void reset() { + gui.dashboardReset(); + cboxes(); + mp0 += mn; rp0 += rn; // port is unusable 30 secs after port close due to special timeout + constellations(); + isReset = false; + } + String switches(String label) { + return label + ": isGui=" + isGui + ", isRun=" + isRun + ", go=" + go + ", pacing=" + pacing + ", spawned=" + spawned; + } + void pacingGo() { for(Constellation con : constls) con.glob.pacingGo = true; } + void runGuiCon() { + synchronized(constls) { + while(isGui && isRun && spawned > 0) { + log(4, switches("runCons constls.wait")); + try { constls.wait(); } catch(InterruptedException x) {} + pacingGo(); + log(4, switches("runCons constls.paint")); + if(!isGui) break; + try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); } + if(!isGui || !isRun) break; + if(!go) { + log(4, switches("runCons constls.gui.wait")); + synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + } + if(!isGui || !isRun) break; + log(4, switches("runCons constls.notifyAll")); + constls.notifyAll(); + } + } + if(isGui && isRun) { + log(4, switches("runCons last repaint")); + try { SwingUtilities.invokeAndWait(CS.gui); } catch(Exception x) { throw new Error(x); } + if(!go) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + } + else { + log(4, switches("runCons stop")); + stop(); + synchronized(constls) { constls.notifyAll(); } + while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} } + } + } + void runCons() { + if(isGui) pacing = true; + else pacing = false; + for(Constellation con : constls) { notFinished++; new Thread(con, con.label).start(); spawned++; } + if(isGui) runGuiCon(); + else while(spawned > 0) synchronized(this) { try { wait(); } catch(InterruptedException x) {} } + } + public void run() { + if(isGui) dashboard(); + if(isRun) { + log(1, "pgm=" + clsPath + getClass().getName() + + ", ttl=" + pttl + ", pace=" + ipace + "msecs, seed=" + rs + ", SSL=" + issl + ", fake=" + fake + ", debug=" + debug); + if(issl > 0) log(3, "cePath=" + cePath); + constellations(); + if(!isGui) { r = new Random(rs); runCons(); } + else while(isGui) { + r = new Random(rs); + runCons(); + log(1, "all constellations finished"); + if(!isGui) break; + gui.parms.buttons.setGo(); + if(!go) gui.parms.buttons.step.setEnabled(false); + if(abend) { + gui.parms.buttons.go.setEnabled(false); + gui.parms.buttons.step.setEnabled(false); + gui.parms.buttons.reset.setEnabled(false); + } + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + if(!isReset) synchronized(gui) { try { gui.wait(); } catch(InterruptedException x) {} } + if(isGui) { + reset(); + do synchronized(gui) { + try { SwingUtilities.invokeAndWait(gui); } catch(Exception x) { throw new Error(x); } + if(!isRun) try { gui.wait(); } catch(InterruptedException x) {} + if(isGui && isReset) reset(); + } while(isGui && !isRun); + } + } + } + log(1, "final balance, connections=" + connCnt + ", forwards=" + forwCnt); + if(gui != null) gui.ui.dispose(); + log(2, "run end"); + } + public static void main(String[] args) throws Exception { + CS cs = new CS(); + cs.run(); + cs.log(1, "cs end"); + //try { cs.run(); } catch(Exception x) { cs.log(0, "interrupted execution"); }; + } +} +// rozeznání konce ve step-módu +// pacing slide +// input fields +// ukládání parametrů +// too many open files +// exceptions +// stavové zprávy na dashboardu \ No newline at end of file