Skip to content
Next Next commit
Removed RabbitBrokerAdmin hard-coded module and function calls, repla…
…ced with new ControlAction lookup.

Added creation of ControlAction types in RabbitControlErlangConverter to map module and function with the converter to use.
Modified ErlangTemplate to use new ControlAction interface but left 2 methods intact for those wishing to use the older access strategy.
Modified ErlangOperations to use ControlAction interface.
  • Loading branch information
Helena Edelson committed Feb 22, 2011
commit dfa3e5bb1add8a9bdc9979f4aff53476a13d1417
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,25 @@
package org.springframework.erlang.core;

import org.springframework.erlang.OtpException;
import org.springframework.erlang.support.converter.ErlangConverter;
import org.springframework.erlang.support.ControlAction;

import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;

/**
* Operations to perform against OTP/Erlang
* @author Mark Pollack
*
* @author Helena Edelson
*/
public interface ErlangOperations {

<T> T execute(ConnectionCallback<T> action) throws OtpException;

OtpErlangObject executeErlangRpc(String module, String function, OtpErlangList args) throws OtpException;


OtpErlangObject executeErlangRpc(String module, String function, OtpErlangObject... args) throws OtpException;


OtpErlangObject executeRpc(String module, String function, Object... args) throws OtpException;

Object executeAndConvertRpc(String module, String function, ErlangConverter converterToUse, Object... args) throws OtpException;

Object executeAndConvertRpc(String module, String function, Object... args) throws OtpException;
OtpErlangObject executeErlangRpc(String module, String function, OtpErlangList args) throws OtpException;

OtpErlangObject executeRpc(ControlAction action, Object... args) throws OtpException;

Object executeAndConvertRpc(ControlAction action, Object... args) throws OtpException;

Object executeAndConvertRpc(String module, String function, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.springframework.erlang.OtpException;
import org.springframework.erlang.connection.Connection;
import org.springframework.erlang.connection.ConnectionFactory;
import org.springframework.erlang.connection.ConnectionFactoryUtils;
import org.springframework.erlang.support.ControlAction;
import org.springframework.erlang.support.ErlangAccessor;
import org.springframework.erlang.support.ErlangUtils;
import org.springframework.erlang.support.converter.ErlangConverter;
Expand All @@ -32,122 +34,111 @@

/**
* @author Mark Pollack
* @author Helena Edelson
*/
public class ErlangTemplate extends ErlangAccessor implements ErlangOperations {


private volatile ErlangConverter erlangConverter = new SimpleErlangConverter();

public ErlangTemplate(ConnectionFactory connectionFactory) {
setConnectionFactory(connectionFactory);
afterPropertiesSet();
}

public OtpErlangObject executeErlangRpc(final String module, final String function, final OtpErlangList args) {
return execute(new ConnectionCallback<OtpErlangObject>() {
public OtpErlangObject doInConnection(Connection connection) throws Exception {
logger.debug("Sending RPC for module [" + module + "] function [" + function + "] args [" + args);
connection.sendRPC(module, function, args);
//TODO consider dedicated response object.
OtpErlangObject response = connection.receiveRPC();
logger.debug("Response received = " + response.toString());
handleResponseError(module, function, response);
return response;
}
});
}

public void handleResponseError(String module, String function, OtpErlangObject result) {
//{badrpc,{'EXIT',{undef,[{rabbit_access_control,list_users,[[]]},{rpc,'-handle_call/3-fun-0-',5}]}}}

if (result instanceof OtpErlangTuple) {
OtpErlangTuple msg = (OtpErlangTuple)result;
if (msg.elementAt(0) instanceof OtpErlangAtom)
{
OtpErlangAtom responseAtom = (OtpErlangAtom)msg.elementAt(0);
//TODO consider error handler strategy.
if (responseAtom.atomValue().equals("badrpc")) {
if (msg.elementAt(1) instanceof OtpErlangTuple) {
throw new ErlangBadRpcException( (OtpErlangTuple)msg.elementAt(1));
} else {
throw new ErlangBadRpcException( msg.elementAt(1).toString());
}
} else if (responseAtom.atomValue().equals("error")) {
if (msg.elementAt(1) instanceof OtpErlangTuple) {
throw new ErlangErrorRpcException( (OtpErlangTuple)msg.elementAt(1));
} else {
throw new ErlangErrorRpcException( msg.elementAt(1).toString());
}
}
}
}
}

public OtpErlangObject executeErlangRpc(String module, String function, OtpErlangObject... args) {
return executeRpc(module, function, new OtpErlangList(args));
}

public OtpErlangObject executeRpc(String module, String function, Object... args) {
return executeErlangRpc(module, function, (OtpErlangList) erlangConverter.toErlang(args));
}

public Object executeAndConvertRpc(String module, String function, ErlangConverter converterToUse, Object... args) {
return converterToUse.fromErlang(executeRpc(module, function, converterToUse.toErlang(args)));
}

public Object executeAndConvertRpc(String module, String function, Object... args) {
return erlangConverter.fromErlangRpc(module, function, executeErlangRpc(module, function, (OtpErlangList)erlangConverter.toErlang(args)));
}

public ErlangConverter getErlangConverter() {
return erlangConverter;
}

public void setErlangConverter(ErlangConverter erlangConverter) {
this.erlangConverter = erlangConverter;
}

public <T> T execute(ConnectionCallback<T> action) throws OtpException {

Assert.notNull(action, "Callback object must not be null");
Connection con = null;
try {
con = createConnection();
return action.doInConnection(con);
}
catch (OtpException ex) {
throw ex;
}
catch (Exception ex) {
throw convertOtpAccessException(ex);
}
finally {
org.springframework.erlang.connection.ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory());
}
// TODO: physically close and reopen the connection if there is an exception

}

/**
* Convert the specified checked exception to
* a Spring runtime exception equivalent.
* <p>The default implementation delegates to the
* {@link org.springframework.erlang.support.ErlangUtils#convertOtpAccessException} method.
* @param ex the original checked {@link Exception} to convert
* @return the Spring runtime wrapping <code>ex</code>
* @see org.springframework.erlang.support.ErlangUtils#convertOtpAccessException
*/
protected OtpException convertOtpAccessException(Exception ex) {
return ErlangUtils.convertOtpAccessException(ex);
}









private volatile ErlangConverter erlangConverter = new SimpleErlangConverter();

public ErlangTemplate(ConnectionFactory connectionFactory) {
setConnectionFactory(connectionFactory);
afterPropertiesSet();
}

public OtpErlangObject executeErlangRpc(final String module, final String function, final OtpErlangList args) {
return execute(new ConnectionCallback<OtpErlangObject>() {
public OtpErlangObject doInConnection(Connection connection) throws Exception {
logger.debug("Sending RPC for module [" + module + "] function [" + function + "] args [" + args);
connection.sendRPC(module, function, args);
//TODO consider dedicated response object.
OtpErlangObject response = connection.receiveRPC();
logger.debug("Response received = " + response.toString());
handleResponseError(module, function, response);
return response;
}
});
}

public OtpErlangObject executeRpc(ControlAction action, Object... args) {
return executeErlangRpc(action.getModule(), action.getFunction(), (OtpErlangList) action.getConverter().toErlang(args));
}

public Object executeAndConvertRpc(ControlAction action, Object... args) {
return action.getConverter().fromErlang(executeRpc(action, args));
}

public Object executeAndConvertRpc(String module, String function, Object... args) {
return erlangConverter.fromErlangRpc(module, function, executeErlangRpc(module, function, (OtpErlangList) erlangConverter.toErlang(args)));
}

public void handleResponseError(String module, String function, OtpErlangObject result) {
//{badrpc,{'EXIT',{undef,[{rabbit_access_control,list_users,[[]]},{rpc,'-handle_call/3-fun-0-',5}]}}}

if (result instanceof OtpErlangTuple) {
OtpErlangTuple msg = (OtpErlangTuple) result;
if (msg.elementAt(0) instanceof OtpErlangAtom) {
OtpErlangAtom responseAtom = (OtpErlangAtom) msg.elementAt(0);
//TODO consider error handler strategy.
if (responseAtom.atomValue().equals("badrpc")) {
if (msg.elementAt(1) instanceof OtpErlangTuple) {
throw new ErlangBadRpcException((OtpErlangTuple) msg.elementAt(1));
} else {
throw new ErlangBadRpcException(msg.elementAt(1).toString());
}
} else if (responseAtom.atomValue().equals("error")) {
if (msg.elementAt(1) instanceof OtpErlangTuple) {
throw new ErlangErrorRpcException((OtpErlangTuple) msg.elementAt(1));
} else {
throw new ErlangErrorRpcException(msg.elementAt(1).toString());
}
}
}
}
}


public ErlangConverter getErlangConverter() {
return erlangConverter;
}

public void setErlangConverter(ErlangConverter erlangConverter) {
this.erlangConverter = erlangConverter;
}

public <T> T execute(ConnectionCallback<T> action) throws OtpException {

Assert.notNull(action, "Callback object must not be null");
Connection con = null;
try {
con = createConnection();
return action.doInConnection(con);
}
catch (OtpException ex) {
throw ex;
}
catch (Exception ex) {
throw convertOtpAccessException(ex);
}
finally {
ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory());
}
// TODO: physically close and reopen the connection if there is an exception

}

/**
* Convert the specified checked exception to
* a Spring runtime exception equivalent.
* <p>The default implementation delegates to the
* {@link org.springframework.erlang.support.ErlangUtils#convertOtpAccessException} method.
* @param ex the original checked {@link Exception} to convert
* @return the Spring runtime wrapping <code>ex</code>
* @see org.springframework.erlang.support.ErlangUtils#convertOtpAccessException
*/
protected OtpException convertOtpAccessException(Exception ex) {
return ErlangUtils.convertOtpAccessException(ex);
}


}
Loading