initReactor.
* @@ -456,7 +491,9 @@ synchronized SocketHandle getOrCreateSocketHandle(SocketAddress socketAddress, S public EndpointHandle bind(SocketAddress socketAddress, EndpointBinding endpointBinding) throws ServiceResultException { if ( endpointBinding == null || socketAddress == null || endpointBinding.endpointServer!=this ) throw new IllegalArgumentException(); - String url = endpointBinding.endpointAddress.getEndpointUrl(); + init(); + + String url = endpointBinding.endpointAddress.getEndpointUrl(); // Start endpoint handler { diff --git a/src/main/java/org/opcfoundation/ua/transport/https/HttpsSettings.java b/src/main/java/org/opcfoundation/ua/transport/https/HttpsSettings.java index be85d964..d54a7b00 100644 --- a/src/main/java/org/opcfoundation/ua/transport/https/HttpsSettings.java +++ b/src/main/java/org/opcfoundation/ua/transport/https/HttpsSettings.java @@ -43,6 +43,9 @@ public class HttpsSettings { X509KeyManager keyManager; /** Trust managers */ TrustManager trustManager; + + int maxConnections = 100; + /** Verifies whether the target hostname matches the names stored inside * the server's X.509 certificate, once the connection has been established. * This verification can provide additional guarantees of authenticity of @@ -280,7 +283,10 @@ public String getPassword() { return password; } - + public int getMaxConnections() { + return maxConnections; + } + public HttpParams getHttpParams() { return httpParams; } @@ -297,6 +303,10 @@ public void setPassword(String password) { this.password = password; } + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + public void readFrom(HttpsSettings src) { if (src.hostnameVerifier!=null) hostnameVerifier = src.hostnameVerifier; if (src.trustManager!=null) this.trustManager = src.trustManager; @@ -307,6 +317,7 @@ public void readFrom(HttpsSettings src) { } if ( src.httpParams != null ) this.httpParams = src.httpParams; if ( src.httpsSecurityPolicies != null ) this.httpsSecurityPolicies = src.httpsSecurityPolicies; + this.maxConnections = src.maxConnections; } @Override diff --git a/src/main/java/org/opcfoundation/ua/transport/impl/ConnectionCollection.java b/src/main/java/org/opcfoundation/ua/transport/impl/ConnectionCollection.java index 267d8e9b..ac877227 100644 --- a/src/main/java/org/opcfoundation/ua/transport/impl/ConnectionCollection.java +++ b/src/main/java/org/opcfoundation/ua/transport/impl/ConnectionCollection.java @@ -13,10 +13,10 @@ package org.opcfoundation.ua.transport.impl; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import org.opcfoundation.ua.transport.ConnectionMonitor; import org.opcfoundation.ua.transport.ServerConnection; @@ -27,7 +27,7 @@ */ public class ConnectionCollection implements ConnectionMonitor { - SetConstructor for OpcTcpServer.
* @@ -167,6 +234,8 @@ public EndpointHandle bind(SocketAddress socketAddress, EndpointBinding endpoint if ( endpointBinding == null || socketAddress == null || endpointBinding.endpointServer!=this ) throw new IllegalArgumentException(); + init(); + String scheme = UriUtil.getTransportProtocol( endpointBinding.endpointAddress.getEndpointUrl() ); if ( !"opc.tcp".equals(scheme) ) throw new ServiceResultException(StatusCodes.Bad_UnexpectedError, "Cannot bind "+scheme+" to opc.tcp server"); SocketHandle socketHandle = getOrCreateSocketHandle(socketAddress); @@ -204,6 +273,8 @@ public void bindReverse(final SocketAddress addressToConnect, if(addressToConnect == null || endpointUrl == null) { throw new IllegalArgumentException(); } + init(); + ReverseSocketHandle socketHandle = new ReverseSocketHandle(addressToConnect); if(socketHandle.socket == null) { try { @@ -230,6 +301,8 @@ public void onClosed(ServiceResultException closeError) { @Override public void onOpen() { }}); + // start listening to messages + conn.init(); //async, do last, others listen on socket state. socketHandle.socket.connect(socketHandle.socketAddress); }catch(IOException e) { @@ -518,6 +591,24 @@ public String toString() { } } + private void init() { + if (!initialized) { + int maxConnections = application.getOpctcpSettings().getMaxConnections(); + if (maxConnections <= 0) { + throw new IllegalStateException("Maximum number of connections was not configured; must be greater than 0"); + } + this.maxConnections = maxConnections; + + int maxSecureChannelsPerConnection = application.getOpctcpSettings().getMaxSecureChannelsPerConnection(); + if (maxSecureChannelsPerConnection <= 0) { + throw new IllegalStateException( + "Maximum number of secure channels per connection was not configured; must be greater than 0"); + } + this.maxSecureChannelsPerConnection = maxSecureChannelsPerConnection; + initialized = true; + } + } + ListcancelTimeoutTimer.
*/ protected void cancelTimeoutTimer() { - // Cancel hand-shake time-out - if (timeoutTimer!=null) { - timeoutTimer.cancel(); - timeoutTimer = null; + // Cancel hand-shake time-out, note that this can never be started again, thus it wont hurt if + // this happens multiple times + final TimerTask tmp = timeoutTimer; // multithread-guard + if (tmp != null) { + tmp.cancel(); + timeoutTimer = null; timeout = null; } } @@ -604,7 +664,10 @@ protected void handleHelloMessage(Hello h) throws ServiceResultException { EndpointBindingCollection c = endpointServer.getEndpointBindings(); if (c==null) throw new ServiceResultException(Bad_UnexpectedError); - String url = trimUrl(h.getEndpointUrl()); + + checkSecureChannelLimit(); + + String url = trimUrl(h.getEndpointUrl()); logger.debug("onHello: url={}", url); @@ -703,6 +766,8 @@ protected void handleOpenSecureChannelRequest(InputMessage mb) throws ServiceRes if (req.getRequestType() == SecurityTokenRequestType.Issue) { + checkSecureChannelLimit(); + OpcTcpServerSecureChannel channel = new OpcTcpServerSecureChannel( this, endpointServer.secureChannelCounter.incrementAndGet() ); logger.debug("handleOpenSecureChannelRequest: endpointServer={} SecureChannelId={}", endpointServer, channel.getSecureChannelId()); channel.handleOpenChannel(mb, req); @@ -817,7 +882,11 @@ protected void handleSymmChunk(ByteBuffer chunk) throws ServiceResultException { } } logger.debug("handleSymmChunk: {}", secureMessageBuilder); - if (secureMessageBuilder!=null && !secureMessageBuilder.moreChunksRequired()) secureMessageBuilder = null; + if (secureMessageBuilder!=null && !secureMessageBuilder.moreChunksRequired()) { + // Ensures streams eventually close by putting a "poison pill" there + secureMessageBuilder.softClose(); + secureMessageBuilder = null; + } if (secureMessageBuilder==null) { secureMessageBuilder = new SecureInputMessageBuilder(token/*channel*/, messageListener, ctx, encoderCtx, channel.recvSequenceNumber); logger.debug("handleSymmChunk: secureMessageBuilder={}", secureMessageBuilder); @@ -1168,6 +1237,20 @@ public void run() { public void run() { try { enc.putMessage(msg.getMessage()); + + // If the message was ActivateSessionResponse, which was Good, then mark that this + // connection has a successful activated session (and thus is not removed when we are at + // max connections. If we are at max connections, per 1.05 Part 4 5.5.2: "To protect + // against misbehaving Clients and denial of service attacks, the Server shall close the + // oldest unused SecureChannelthat has no Session assigned before reaching the maximum + // number of supported SecureChannels. ") + if (msg.getMessage() instanceof ActivateSessionResponse) { + ActivateSessionResponse res = (ActivateSessionResponse) msg.getMessage(); + if ((res.getResponseHeader() != null) && (res.getResponseHeader().getServiceResult() != null) && + (res.getResponseHeader().getServiceResult().isGood())) { + hasBeenSuccessfullySessionActivated.set(true); + } + } } catch (ServiceResultException e) { msg.setError( StackUtils.toServiceResultException(e) ); } diff --git a/src/main/java/org/opcfoundation/ua/transport/tcp/nio/SecureInputMessageBuilder.java b/src/main/java/org/opcfoundation/ua/transport/tcp/nio/SecureInputMessageBuilder.java index 70cf2c28..f37caa7a 100644 --- a/src/main/java/org/opcfoundation/ua/transport/tcp/nio/SecureInputMessageBuilder.java +++ b/src/main/java/org/opcfoundation/ua/transport/tcp/nio/SecureInputMessageBuilder.java @@ -77,6 +77,13 @@ public class SecureInputMessageBuilder implements InputMessage { AtomicInteger expectedSequenceNumber; static Logger log = LoggerFactory.getLogger(SecureInputMessageBuilder.class); + /* + * Total number of bytes added via addChunk. This is a long because our max message size is an + * int, so this avoids theoretical overflow cases. Since SecureInputMessageBuilders are not + * re-used, there is no need to reset this counter. + */ + long byteCount = 0; + public interface MessageListener { /** * On message completed or error occured. @@ -172,6 +179,16 @@ public String toString() { public synchronized void addChunk(final ByteBuffer chunk) throws ServiceResultException { if (!acceptsChunks) throw new ServiceResultException(StatusCodes.Bad_UnexpectedError, "Final chunk added to message builder"); + + // prevent messages larger than our max limit + byteCount = byteCount + chunk.remaining(); + log.trace("Current message size via chunks: {}", byteCount); + if (byteCount > encoderCtx.getMaxMessageSize()) { + log.trace("Max message limits ({}) exceeded, at {}, stopping accepting chunks", encoderCtx.getMaxMessageSize(), + byteCount); + throw new ServiceResultException(StatusCodes.Bad_RequestTooLarge); + } + final int chunkNumber = chunksAdded++; chunkSequenceNumbers.add(null); int type = ChunkUtils.getMessageType(chunk); @@ -190,6 +207,13 @@ public synchronized void addChunk(final ByteBuffer chunk) throws ServiceResultEx } chunkSink.incubate(chunk); + + // IF it was the final chunk, "close" the buffer so that it knows no more data is to + // come for it. Note that all the data still comes from the sink, this just stops it after that + if (!acceptsChunks) { + chunkSink.close(); + } + Runnable handleChunkRun = new Runnable() { public void run() { if (hasError()) return; @@ -251,7 +275,8 @@ else if (token instanceof SecurityConfiguration) { StackUtils.getNonBlockingWorkExecutor().execute(handleChunkRun); // Start decoding message - if (chunkNumber==0) + // only after the final chunk has been received + if (!acceptsChunks) StackUtils.getBlockingWorkExecutor().execute(messageDecoderRun); } @@ -341,7 +366,11 @@ public void close() { chunkSink.forceClose(); } - /** + public void softClose() { + chunkSink.close(); + } + + /** *getMessage.
* * @return a {@link org.opcfoundation.ua.encoding.IEncodeable} object. diff --git a/src/main/java/org/opcfoundation/ua/utils/BouncyCastleUtils.java b/src/main/java/org/opcfoundation/ua/utils/BouncyCastleUtils.java index 6355fe82..f500ea19 100644 --- a/src/main/java/org/opcfoundation/ua/utils/BouncyCastleUtils.java +++ b/src/main/java/org/opcfoundation/ua/utils/BouncyCastleUtils.java @@ -84,8 +84,8 @@ public class BouncyCastleUtils { // startDate, expiryDate, dn, // keyPair.getPublic()); // ContentSigner signer = new JcaContentSignerBuilder("SHA1withRSA") -// .setProvider("BC").build(keyPair.getPrivate()); -// return new JcaX509CertificateConverter().setProvider("BC") +// .setProvider(CryptoUtil.getSecurityProviderName()).build(keyPair.getPrivate()); +// return new JcaX509CertificateConverter().setProvider(CryptoUtil.getSecurityProviderName()) // .getCertificate(certBldr.build(signer)); // } @@ -149,11 +149,11 @@ expiryDate, new X500Principal(commonName), ContentSigner signer; try { signer = new JcaContentSignerBuilder(CertificateUtils.getCertificateSignatureAlgorithm()) - .setProvider("BC").build(privateKey); + .setProvider(CryptoUtil.getSecurityProviderName()).build(privateKey); } catch (OperatorCreationException e) { throw new GeneralSecurityException("Failed to sign the certificate", e); } - return new JcaX509CertificateConverter().setProvider("BC") + return new JcaX509CertificateConverter().setProvider(CryptoUtil.getSecurityProviderName()) .getCertificate(certBldr.build(signer)); } @@ -299,9 +299,9 @@ public static X509Certificate generateCertificate(String domainName, PublicKey p //***** generate certificate ***********/ try { ContentSigner signer = new JcaContentSignerBuilder( - CertificateUtils.getCertificateSignatureAlgorithm()).setProvider("BC") + CertificateUtils.getCertificateSignatureAlgorithm()).setProvider(CryptoUtil.getSecurityProviderName()) .build(signerKey); - return new JcaX509CertificateConverter().setProvider("BC") + return new JcaX509CertificateConverter().setProvider(CryptoUtil.getSecurityProviderName()) .getCertificate(certBldr.build(signer)); } catch (OperatorCreationException e) { throw new GeneralSecurityException(e); @@ -326,7 +326,7 @@ public static void writeToPem(Object key, File savePath, String password, String pemWrt.writeObject(key); else { char[] pw = password.toCharArray(); - PEMEncryptor encryptor = new JcePEMEncryptorBuilder(algorithm).setProvider("BC").setSecureRandom(CryptoUtil.getRandom()).build(pw); + PEMEncryptor encryptor = new JcePEMEncryptorBuilder(algorithm).setProvider(CryptoUtil.getSecurityProviderName()).setSecureRandom(CryptoUtil.getRandom()).build(pw); pemWrt.writeObject(key, encryptor); } diff --git a/src/main/java/org/opcfoundation/ua/utils/CertificateUtils.java b/src/main/java/org/opcfoundation/ua/utils/CertificateUtils.java index bcf49cd2..021069da 100644 --- a/src/main/java/org/opcfoundation/ua/utils/CertificateUtils.java +++ b/src/main/java/org/opcfoundation/ua/utils/CertificateUtils.java @@ -21,7 +21,7 @@ import java.io.InputStream; import java.math.BigInteger; import java.net.URL; -import java.net.URLConnection; +import java.net.URLConnection; import java.security.GeneralSecurityException; import java.security.InvalidKeyException; import java.security.Key; @@ -44,7 +44,7 @@ import java.security.cert.CertificateParsingException; import java.security.cert.X509Certificate; import java.security.interfaces.RSAPrivateKey; -import java.util.Arrays; +import java.util.Arrays; import java.util.Calendar; import java.util.Collection; import java.util.Date; @@ -82,6 +82,16 @@ public static byte[] base64Decode(String string) { return getCertificateProvider().base64Decode(string); } + /** + *base64Decode.
+ * + * @param bytes the array of bytes to convert. + * @return an array of byte. + */ + public static byte[] base64Decode(byte[] bytes) { + return getCertificateProvider().base64Decode(bytes); + } + /** *base64Encode a byte array to string
* @@ -234,6 +244,13 @@ public static X509Certificate decodeX509Certificate( public static RSAPrivateKey loadFromKeyStore(URL keystoreUrl, String password) throws IOException, NoSuchAlgorithmException, CertificateException, KeyStoreException, UnrecoverableKeyException { + return loadFromKeyStore(keystoreUrl,password.toCharArray()); + } + + //Overloaded method to accept password as character array + public static RSAPrivateKey loadFromKeyStore(URL keystoreUrl, + char[] password) throws IOException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, UnrecoverableKeyException { logger.debug("loadFromKeyStore: keystoreUrl={}", keystoreUrl); // Open pfx-certificate URLConnection connection = keystoreUrl.openConnection(); @@ -255,13 +272,14 @@ public static RSAPrivateKey loadFromKeyStore(URL keystoreUrl, keyStore = KeyStore.getInstance("PKCS12"); } logger.debug("loadFromKeyStore: keyStore Provider={}", keyStore.getProvider()); - keyStore.load(is, password == null ? null : password.toCharArray()); + + keyStore.load(is, password == null ? null : password); Enumerationbase64Encode a byte array to string
@@ -718,6 +730,22 @@ public static String toHex(byte[] bytes, int bytesPerRow) { return sb.toString(); } + /** + * Convert Char Array to Byte Array without String to avoid leaving traces of the intermediate results in the memory. + * + * If chars is null, returns an empty byte array. + */ + public static byte[] toBytes(char[] chars) { + if (chars == null) + return new byte[0]; + CharBuffer charBuffer = CharBuffer.wrap(chars); + ByteBuffer byteBuffer = Charset.forName("UTF-8").encode(charBuffer); + byte[] bytes = Arrays.copyOfRange(byteBuffer.array(), + byteBuffer.position(), byteBuffer.limit()); + Arrays.fill(byteBuffer.array(), (byte) 0); // clear sensitive data + return bytes; + } + /** * Verify a signature. * diff --git a/src/main/java/org/opcfoundation/ua/utils/EndpointUtil.java b/src/main/java/org/opcfoundation/ua/utils/EndpointUtil.java index 357c9c2e..48642ea1 100644 --- a/src/main/java/org/opcfoundation/ua/utils/EndpointUtil.java +++ b/src/main/java/org/opcfoundation/ua/utils/EndpointUtil.java @@ -316,7 +316,7 @@ public static void reverse(Object array) { * * @param ep a {@link org.opcfoundation.ua.core.EndpointDescription} object. * @param username a {@link java.lang.String} object. - * @param password a {@link java.lang.String} object. + * @param password the clear text password as {@link java.lang.String}. * @return user identity token * @throws org.opcfoundation.ua.common.ServiceResultException if endpoint or the stack doesn't support UserName token policy * @param byteString an array of byte. @@ -324,60 +324,87 @@ public static void reverse(Object array) { public static UserIdentityToken createUserNameIdentityToken(EndpointDescription ep, ByteString byteString, String username, String password) throws ServiceResultException { - UserTokenPolicy policy = ep.findUserTokenPolicy(UserTokenType.UserName); - if (policy==null) throw new ServiceResultException(StatusCodes.Bad_IdentityTokenRejected, "UserName not supported"); - String securityPolicyUri = policy.getSecurityPolicyUri(); - if (securityPolicyUri==null) securityPolicyUri = ep.getSecurityPolicyUri(); - SecurityPolicy securityPolicy = SecurityPolicy.getSecurityPolicy( securityPolicyUri ); - if (securityPolicy==null) securityPolicy = SecurityPolicy.NONE; - UserNameIdentityToken token = new UserNameIdentityToken(); - - token.setUserName( username ); - token.setPolicyId( policy.getPolicyId() ); - - // Encrypt the password, unless no security is defined - SecurityAlgorithm algorithm = securityPolicy.getAsymmetricEncryptionAlgorithm(); - logger.debug("createUserNameIdentityToken: algorithm={}", algorithm); - byte[] pw = password.getBytes(BinaryEncoder.UTF8); - if (algorithm == null) - token.setPassword(ByteString.valueOf(pw)); - else { - try { - byte[] c = ByteString.asByteArray(ep.getServerCertificate()); - Cert serverCert = (c == null || c.length == 0) ? null : new Cert(c); - if (byteString != null) - pw = ByteBufferUtils.concatenate(toArray(pw.length - + byteString.getLength()), pw, byteString.getValue()); - else - pw = ByteBufferUtils.concatenate(toArray(pw.length), pw); - pw = CryptoUtil.encryptAsymm(pw, serverCert.getCertificate() - .getPublicKey(), algorithm); - token.setPassword(ByteString.valueOf(pw)); - - } catch (InvalidKeyException e) { - // Server certificate does not have encrypt usage - throw new ServiceResultException( - StatusCodes.Bad_CertificateInvalid, - "Server certificate in endpoint is invalid: " - + e.getMessage()); - } catch (IllegalBlockSizeException e) { - throw new ServiceResultException( - StatusCodes.Bad_SecurityPolicyRejected, e.getClass() - .getName() + ":" + e.getMessage()); - } catch (BadPaddingException e) { - throw new ServiceResultException( - StatusCodes.Bad_CertificateInvalid, - "Server certificate in endpoint is invalid: " - + e.getMessage()); - } catch (NoSuchAlgorithmException e) { - throw new ServiceResultException(StatusCodes.Bad_InternalError, e); - } catch (NoSuchPaddingException e) { - throw new ServiceResultException(StatusCodes.Bad_InternalError, e); - } - token.setEncryptionAlgorithm(algorithm.getUri()); - } - return token; + return createUserNameIdentityToken(ep, byteString, username, password == null ? null : password.toCharArray()); } + + /** + * Create user identity token based on username and password + * + * @param ep a {@link org.opcfoundation.ua.core.EndpointDescription} object. + * @param username a {@link java.lang.String} object. + * @param password the clear text password as char array. + * @return user identity token + * @throws org.opcfoundation.ua.common.ServiceResultException if endpoint or the stack doesn't support UserName token policy + * @param byteString an array of byte. + */ + public static UserIdentityToken createUserNameIdentityToken(EndpointDescription ep, ByteString byteString, String username, char[] password) + throws ServiceResultException + { + UserTokenPolicy policy = ep.findUserTokenPolicy(UserTokenType.UserName); + if (policy==null) throw new ServiceResultException(StatusCodes.Bad_IdentityTokenRejected, "UserName not supported"); + String securityPolicyUri = policy.getSecurityPolicyUri(); + if (securityPolicyUri==null) securityPolicyUri = ep.getSecurityPolicyUri(); + SecurityPolicy securityPolicy = SecurityPolicy.getSecurityPolicy( securityPolicyUri ); + if (securityPolicy==null) securityPolicy = SecurityPolicy.NONE; + UserNameIdentityToken token = new UserNameIdentityToken(); + + token.setUserName( username ); + token.setPolicyId( policy.getPolicyId() ); + + // Encrypt the password, unless no security is defined + SecurityAlgorithm algorithm = securityPolicy.getAsymmetricEncryptionAlgorithm(); + logger.debug("createUserNameIdentityToken: algorithm={}", algorithm); + + byte[] pwTemp = CryptoUtil.toBytes(password); + + if (algorithm == null) + { + token.setPassword(ByteString.valueOf(pwTemp)); + //Clear sensitive data from memory + ByteBufferUtils.clear(pwTemp); + } + else { + try { + byte[] pw = null; + byte[] c = ByteString.asByteArray(ep.getServerCertificate()); + Cert serverCert = (c == null || c.length == 0) ? null : new Cert(c); + if (byteString != null) + pw = ByteBufferUtils.concatenate(toArray(pwTemp.length + + byteString.getLength()), pwTemp, byteString.getValue()); + else + pw = ByteBufferUtils.concatenate(toArray(pwTemp.length), pwTemp); + //Clear sensitive data from memory + ByteBufferUtils.clear(pwTemp); + byte[] pw1 = CryptoUtil.encryptAsymm(pw, serverCert.getCertificate() + .getPublicKey(), algorithm); + token.setPassword(ByteString.valueOf(pw1)); + //Clear sensitive data from memory + ByteBufferUtils.clear(pw); + + } catch (InvalidKeyException e) { + // Server certificate does not have encrypt usage + throw new ServiceResultException( + StatusCodes.Bad_CertificateInvalid, + "Server certificate in endpoint is invalid: " + + e.getMessage()); + } catch (IllegalBlockSizeException e) { + throw new ServiceResultException( + StatusCodes.Bad_SecurityPolicyRejected, e.getClass() + .getName() + ":" + e.getMessage()); + } catch (BadPaddingException e) { + throw new ServiceResultException( + StatusCodes.Bad_CertificateInvalid, + "Server certificate in endpoint is invalid: " + + e.getMessage()); + } catch (NoSuchAlgorithmException e) { + throw new ServiceResultException(StatusCodes.Bad_InternalError, e); + } catch (NoSuchPaddingException e) { + throw new ServiceResultException(StatusCodes.Bad_InternalError, e); + } + token.setEncryptionAlgorithm(algorithm.getUri()); + } + return token; + } /** * Create user identity token based on an issued token diff --git a/src/main/java/org/opcfoundation/ua/utils/IncubationQueue.java b/src/main/java/org/opcfoundation/ua/utils/IncubationQueue.java index 60e65374..9aaa5738 100644 --- a/src/main/java/org/opcfoundation/ua/utils/IncubationQueue.java +++ b/src/main/java/org/opcfoundation/ua/utils/IncubationQueue.java @@ -39,11 +39,15 @@ * q.removeNextHatchedIfAvailable(); // returns null * q.hatch("c"); * q.removeNextHatched(); // returns "c" - * - * @author Toni Kalajainen (toni.kalajainen@iki.fi) */ public class IncubationQueue- * The default handler will just log the exception as an Error to the log4j log. + * The default handler will just log the exception as an ERROR to the logger. *
* Set the handler to provide custom behavior in your application. *
diff --git a/src/main/java/org/opcfoundation/ua/utils/bytebuffer/ByteBufferUtils.java b/src/main/java/org/opcfoundation/ua/utils/bytebuffer/ByteBufferUtils.java index a0c727c8..42c6b18f 100644 --- a/src/main/java/org/opcfoundation/ua/utils/bytebuffer/ByteBufferUtils.java +++ b/src/main/java/org/opcfoundation/ua/utils/bytebuffer/ByteBufferUtils.java @@ -13,11 +13,10 @@ package org.opcfoundation.ua.utils.bytebuffer; import java.nio.ByteBuffer; +import java.util.Arrays; /** *
ByteBufferUtils class.
- * - * @author Toni Kalajainen (toni.kalajainen@vtt.fi) */ public class ByteBufferUtils { @@ -50,7 +49,7 @@ public static void copy(ByteBuffer src, ByteBuffer dst, int length) src.limit(srcLimit); dst.limit(dstLimit); } - + /** * Concatenate two arrays to one * @@ -72,5 +71,15 @@ public static byte[] concatenate(byte[]...chunks) return result; } + /** + * Fill the byte array with 0 values. + * + * @param value an array of byte. + */ + public static void clear(byte[] value) + { + Arrays.fill(value, (byte)0); + } + } diff --git a/src/main/java/org/opcfoundation/ua/utils/bytebuffer/IncubationBuffer.java b/src/main/java/org/opcfoundation/ua/utils/bytebuffer/IncubationBuffer.java index f30bd5c4..4c926dcd 100644 --- a/src/main/java/org/opcfoundation/ua/utils/bytebuffer/IncubationBuffer.java +++ b/src/main/java/org/opcfoundation/ua/utils/bytebuffer/IncubationBuffer.java @@ -25,8 +25,6 @@ * The data in ByteBuffers in read in the order they are "incubated" * The data becomes available when the ByteBuffers are "hatched" * Input stream blocks until data becomes available. - * - * @author Toni Kalajainen (toni.kalajainen@vtt.fi) */ public class IncubationBuffer extends InputStream { @@ -34,6 +32,8 @@ public class IncubationBuffer extends InputStream { protected final static ByteBuffer CLOSED_MARKER = ByteBuffer.allocate(0); protected IncubationQueueConstructor for IncubationBuffer.
@@ -43,14 +43,21 @@ public IncubationBuffer() { } /** - * Submits a byte buffer to the use of input stream - * - * @param buf byte buffer to offer for use + * Submits a byte buffer to the use of input stream, it can only be used once + * {@link #hatch(ByteBuffer)} has been called for the same buffer. */ public void incubate(ByteBuffer buf) { + // Here compared to hatch we can prevent any new buffers, as the stream already contains all the + // data it can (some of which might not yet be hatched though). + if (closed || forceClosed) { + return; + } synchronized(queue) { - queue.incubate(buf); + if (closed || forceClosed) { + return; + } + queue.incubate(buf); } } @@ -61,8 +68,16 @@ public void incubate(ByteBuffer buf) */ public void hatch(ByteBuffer buf) { + // Note that close here must not prevent hatching, since that happens in async manner. Only + // forceClose should prevent it,as that has already removed data. + if (forceClosed) { + return; + } synchronized(queue) { - queue.hatch(buf); + if (forceClosed) { + return; + } + queue.hatch(buf); } } @@ -71,9 +86,16 @@ public void hatch(ByteBuffer buf) */ public void close() { + if (closed) { + return; + } synchronized(queue) { - queue.incubate(CLOSED_MARKER); - queue.hatch(CLOSED_MARKER); + if (closed) { + return; + } + queue.incubate(CLOSED_MARKER); + queue.hatch(CLOSED_MARKER); // will notifyAll + closed = true; } } @@ -82,10 +104,16 @@ public void close() */ public void forceClose() { + if (forceClosed) { + return; + } synchronized(queue) { - queue.clear(); - queue.incubate(CLOSED_MARKER); - queue.hatch(CLOSED_MARKER); + if (forceClosed) { + return; + } + queue.clear(); // will notifyAll + forceClosed = true; + closed = true; } } @@ -96,7 +124,7 @@ public void forceClose() private ByteBuffer getByteBuffer() throws InterruptedIOException { synchronized(queue) { - if (cur==CLOSED_MARKER) return null; + if (cur==CLOSED_MARKER || forceClosed) return null; if (cur!=null && cur.hasRemaining()) return cur; if (cur!=null && !cur.hasRemaining()) cur = null; try { diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties deleted file mode 100644 index a1d2eeed..00000000 --- a/src/test/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -# Normal log properties - Logs Warnings, Errors and Severe -log4j.rootLogger=OFF, stdout - -# Print errors -#log4j.logger.org.opcfoundation.ua=ERROR, stdout - -# Print everything -log4j.logger.org.opcfoundation.ua=OFF -log4j.logger.org.opcfoundation.ua.unittests=OFF -log4j.logger.org.opcfoundation.ua.transport.tcp.impl=OFF - -# stdout outputs to System.out. -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -# stdout uses PatternLayout. -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -# The conversion pattern uses format specifiers. You might want to -# change the pattern an watch the output format change. -log4j.appender.stdout.layout.ConversionPattern=%-4r %-5p [%t] %37c %3x - %m%n - -# stdout outputs to System.out. -log4j.appender.stderr=org.apache.log4j.ConsoleAppender -log4j.appender.stderr.target=System.err -# stdout uses PatternLayout. -log4j.appender.stderr.layout=org.apache.log4j.PatternLayout -# The conversion pattern uses format specifiers. You might want to -# change the pattern an watch the output format change. -log4j.appender.stderr.layout.ConversionPattern=%-4r %-5p [%t] %37c %3x - %m%n