Skip to content
Next Next commit
Minor Connection Reader Optimization
  • Loading branch information
scottf committed Oct 17, 2025
commit fab777388e56a48fb977700ec7a9c45cc2aa02c1
152 changes: 91 additions & 61 deletions src/main/java/io/nats/client/impl/NatsConnectionReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,30 @@ enum Mode {
GATHER_DATA
};

enum Op {
OK(OP_OK),
MSG(OP_MSG),
PING(OP_PING),
PONG(OP_PONG),
ERR(OP_ERR),
HMSG(OP_HMSG),
INFO(OP_INFO),
UNKNOWN(UNKNOWN_OP);

public final String text;

Op(String text) {
this.text = text;
}
}

private final NatsConnection connection;

private ByteBuffer protocolBuffer; // use a byte buffer to assist character decoding

private boolean gotCR;

private String op;
private Op op;
private final char[] opArray;
private int opPos;

Expand Down Expand Up @@ -208,7 +225,7 @@ void gatherOp(int maxPos) throws IOException {
} else if (b == SP || b == TAB) { // Got a space, get the rest of the protocol line
this.op = opFor(opArray, opPos);
this.opPos = 0;
if (this.op.equals(OP_MSG) || this.op.equals(OP_HMSG)) {
if (op.equals(Op.MSG) || op.equals(Op.HMSG)) {
this.msgLinePosition = 0;
this.mode = Mode.GATHER_MSG_HMSG_PROTO;
} else {
Expand Down Expand Up @@ -352,13 +369,13 @@ void gatherMessageData(int maxPos) throws IOException {
NatsMessage m = incoming.getMessage();
this.connection.deliverMessage(m);
if (readListener != null) {
readListener.message(op, m);
readListener.message(op.text, m);
}
msgData = null;
msgDataPosition = 0;
incoming = null;
gotCR = false;
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
} else {
Expand Down Expand Up @@ -394,51 +411,64 @@ public String grabNextMessageLineElement(int max) {
return new String(this.msgLineChars, start, this.msgLinePosition-start);
}

static String opFor(char[] chars, int length) {
static Op opFor(char[] chars, int length) {
if (length == 3) {
if ((chars[0] == 'M' || chars[0] == 'm') &&
(chars[1] == 'S' || chars[1] == 's') &&
(chars[2] == 'G' || chars[2] == 'g')) {
return OP_MSG;
} else if (chars[0] == '+' &&
(chars[1] == 'O' || chars[1] == 'o') &&
if ((chars[0] == 'M' || chars[0] == 'm')) {
if ((chars[1] == 'S' || chars[1] == 's') &&
(chars[2] == 'G' || chars[2] == 'g'))
{
return Op.MSG;
}
return Op.UNKNOWN;
}
if (chars[0] == '+' &&
(chars[1] == 'O' || chars[1] == 'o') &&
(chars[2] == 'K' || chars[2] == 'k')) {
return OP_OK;
} else {
return UNKNOWN_OP;
return Op.OK;
}
} else if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap
if ((chars[1] == 'I' || chars[1] == 'i') &&
(chars[0] == 'P' || chars[0] == 'p') &&
return Op.UNKNOWN;
}
if (length == 4) { // do them in a unique order for uniqueness when possible to branch asap
if ((chars[1] == 'I' || chars[1] == 'i')) {
if ((chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_PING;
} else if ((chars[1] == 'O' || chars[1] == 'o') &&
(chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_PONG;
} else if (chars[0] == '-' &&
(chars[1] == 'E' || chars[1] == 'e') &&
(chars[2] == 'R' || chars[2] == 'r') &&
(chars[3] == 'R' || chars[3] == 'r')) {
return OP_ERR;
} else if ((chars[0] == 'I' || chars[0] == 'i') &&
(chars[1] == 'N' || chars[1] == 'n') &&
return Op.PING;
}
return Op.UNKNOWN;
}
if ((chars[1] == 'O' || chars[1] == 'o')) {
if ((chars[0] == 'P' || chars[0] == 'p') &&
(chars[2] == 'N' || chars[2] == 'n') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return Op.PONG;
}
return Op.UNKNOWN;
}
if (chars[0] == '-') {
if ((chars[1] == 'E' || chars[1] == 'e') &&
(chars[2] == 'R' || chars[2] == 'r') &&
(chars[3] == 'R' || chars[3] == 'r')) {
return Op.ERR;
}
return Op.UNKNOWN;
}
if ((chars[0] == 'I' || chars[0] == 'i')) {
if ((chars[1] == 'N' || chars[1] == 'n') &&
(chars[2] == 'F' || chars[2] == 'f') &&
(chars[3] == 'O' || chars[3] == 'o')) {
return OP_INFO;
} else if ((chars[0] == 'H' || chars[0] == 'h') &&
(chars[1] == 'M' || chars[1] == 'm') &&
(chars[2] == 'S' || chars[2] == 's') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return OP_HMSG;
} else {
return UNKNOWN_OP;
return Op.INFO;
}
return Op.UNKNOWN;
}
if ((chars[0] == 'H' || chars[0] == 'h') &&
(chars[1] == 'M' || chars[1] == 'm') &&
(chars[2] == 'S' || chars[2] == 's') &&
(chars[3] == 'G' || chars[3] == 'g')) {
return Op.HMSG;
}
} else {
return UNKNOWN_OP;
}
return Op.UNKNOWN;
}

private static final int[] TENS = new int[] { 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000, 1_000_000_000};
Expand Down Expand Up @@ -467,8 +497,8 @@ public static int parseLength(String s) throws NumberFormatException {

void parseProtocolMessage() throws IOException {
try {
switch (this.op) {
case OP_MSG:
switch (op) {
case MSG:
int protocolLength = this.msgLinePosition; //This is just after the last character
int protocolLineLength = protocolLength + 4; // 4 for the "MSG "

Expand Down Expand Up @@ -505,7 +535,7 @@ void parseProtocolMessage() throws IOException {
this.msgDataPosition = 0;
this.msgLinePosition = 0;
break;
case OP_HMSG:
case HMSG:
int hProtocolLength = this.msgLinePosition; //This is just after the last character
int hProtocolLineLength = hProtocolLength + 5; // 5 for the "HMSG "

Expand Down Expand Up @@ -549,50 +579,50 @@ void parseProtocolMessage() throws IOException {
this.msgDataPosition = 0;
this.msgLinePosition = 0;
break;
case OP_OK:
case OK:
this.connection.processOK();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_ERR:
case ERR:
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
this.connection.processError(errorText);
if (readListener != null) {
readListener.protocol(op, errorText);
readListener.protocol(op.text, errorText);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_PING:
case PING:
this.connection.sendPong();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_PONG:
case PONG:
this.connection.handlePong();
if (readListener != null) {
readListener.protocol(op, null);
readListener.protocol(op.text, null);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
case OP_INFO:
case INFO:
String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
this.connection.handleInfo(info);
if (readListener != null) {
readListener.protocol(op, info);
readListener.protocol(op.text, info);
}
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
break;
default:
throw new IllegalStateException("Unknown protocol operation "+op);
throw new IllegalStateException("Unknown protocol operation " + op.text);
}

} catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
Expand All @@ -608,11 +638,11 @@ void encounteredProtocolError(Exception ex) throws IOException {
void fakeReadForTest(byte[] bytes) {
System.arraycopy(bytes, 0, this.buffer, 0, bytes.length);
this.bufferPosition = 0;
this.op = UNKNOWN_OP;
this.op = Op.UNKNOWN;
this.mode = Mode.GATHER_OP;
}

String currentOp() {
return this.op;
return op.text;
}
}