165165 type = {DataType .STRING },
166166 optional = true ,
167167 defaultValue = "-" ),
168+ @ Parameter (
169+ name = "body.consumer.key" ,
170+ description = "Consumer key used for calling endpoints protected by OAuth 2.0 if it's " +
171+ "required to be sent in token request body" ,
172+ type = {DataType .STRING },
173+ optional = true ,
174+ defaultValue = "-" ),
175+ @ Parameter (
176+ name = "body.consumer.secret" ,
177+ description = "Consumer secret used for calling endpoints protected by OAuth 2.0" ,
178+ type = {DataType .STRING },
179+ optional = true ,
180+ defaultValue = "-" ),
168181 @ Parameter (
169182 name = "token.url" ,
170183 description = "Token URL to generate a new access tokens " +
179192 type = {DataType .STRING },
180193 optional = true ,
181194 defaultValue = "-" ),
195+ @ Parameter (
196+ name = HttpConstants .OAUTH2_SCOPE_PARAMETER_NAME ,
197+ description = "Standard OAuth 2.0 scope parameter" ,
198+ type = {DataType .STRING },
199+ optional = true ,
200+ defaultValue = "default"
201+ ),
182202 @ Parameter (
183203 name = "headers" ,
184204 description = "HTTP request headers in format `\" '<key>:<value>','<key>:<value>'\" `.\n " +
432452)
433453public class HttpSink extends Sink {
434454 private static final Logger log = Logger .getLogger (HttpSink .class );
435- String mapType ;
436- Option httpHeaderOption ;
437- Option httpMethodOption ;
438455 protected String streamID ;
439456 protected String consumerKey ;
440457 protected String consumerSecret ;
441- private String authorizationHeader ;
442458 protected String userName ;
443459 protected String userPassword ;
444460 protected ClientConnector staticClientConnector ;
445461 protected Option publisherURLOption ;
462+ protected SiddhiAppContext siddhiAppContext ;
463+ protected String oauthUsername ;
464+ protected String oauthUserPassword ;
465+ protected String authType ;
466+ protected String tokenURL ;
467+ String mapType ;
468+ Option httpHeaderOption ;
469+ Option httpMethodOption ;
470+ private String authorizationHeader ;
446471 private String clientStoreFile ;
447472 private String clientStorePass ;
448473 private int socketIdleTimeout ;
@@ -452,20 +477,18 @@ public class HttpSink extends Sink {
452477 private String parametersList ;
453478 private String clientBootstrapConfiguration ;
454479 private ConfigReader configReader ;
455- protected SiddhiAppContext siddhiAppContext ;
456- protected String oauthUsername ;
457- protected String oauthUserPassword ;
458480 private Option refreshToken ;
459- protected String authType ;
460481 private AccessTokenCache accessTokenCache = AccessTokenCache .getInstance ();
461- protected String tokenURL ;
462482 private String hostnameVerificationEnabled ;
463483 private String sslVerificationDisabled ;
464484 private Executor executor = null ;
465485 private String publisherURL ;
466486 protected SinkMetrics metrics ;
467487 protected long startTime ;
468488 protected long endTime ;
489+ private String bodyConsumerKey ;
490+ private String bodyConsumerSecret ;
491+ private String oauth2Scope ;
469492
470493 private DefaultHttpWsConnectorFactory httpConnectorFactory ;
471494 private ProxyServerConfiguration proxyServerConfiguration ;
@@ -519,6 +542,9 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
519542 this .httpMethodOption = optionHolder .getOrCreateOption (HttpConstants .METHOD , HttpConstants .DEFAULT_METHOD );
520543 this .consumerKey = optionHolder .validateAndGetStaticValue (HttpConstants .CONSUMER_KEY , EMPTY_STRING );
521544 this .consumerSecret = optionHolder .validateAndGetStaticValue (HttpConstants .CONSUMER_SECRET , EMPTY_STRING );
545+ this .bodyConsumerKey = optionHolder .validateAndGetStaticValue (HttpConstants .BODY_CONSUMER_KEY , EMPTY_STRING );
546+ this .bodyConsumerSecret = optionHolder .validateAndGetStaticValue (HttpConstants .BODY_CONSUMER_SECRET ,
547+ EMPTY_STRING );
522548 this .oauthUsername = optionHolder .validateAndGetStaticValue (HttpConstants .RECEIVER_OAUTH_USERNAME ,
523549 EMPTY_STRING );
524550 this .oauthUserPassword = optionHolder .validateAndGetStaticValue (HttpConstants .RECEIVER_OAUTH_PASSWORD ,
@@ -527,6 +553,8 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
527553 this .tokenURL = optionHolder .validateAndGetStaticValue (HttpConstants .TOKEN_URL , EMPTY_STRING );
528554 this .clientStoreFile = optionHolder .validateAndGetStaticValue (HttpConstants .CLIENT_TRUSTSTORE_PATH_PARAM ,
529555 HttpSinkUtil .trustStorePath (configReader ));
556+ this .oauth2Scope = optionHolder .validateAndGetStaticValue (HttpConstants .OAUTH2_SCOPE_PARAMETER_NAME ,
557+ "default" );
530558 clientStorePass = optionHolder .validateAndGetStaticValue (HttpConstants .CLIENT_TRUSTSTORE_PASSWORD_PARAM ,
531559 HttpSinkUtil .trustStorePassword (configReader ));
532560 socketIdleTimeout = Integer .parseInt (optionHolder .validateAndGetStaticValue
@@ -554,6 +582,8 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
554582 authType = HttpConstants .BASIC_AUTH ;
555583 } else if ((!HttpConstants .EMPTY_STRING .equals (consumerKey )
556584 && !HttpConstants .EMPTY_STRING .equals (consumerSecret )) ||
585+ (!HttpConstants .EMPTY_STRING .equals (bodyConsumerKey )
586+ && !HttpConstants .EMPTY_STRING .equals (bodyConsumerSecret )) ||
557587 (!HttpConstants .EMPTY_STRING .equals (oauthUsername )
558588 && !HttpConstants .EMPTY_STRING .equals (oauthUserPassword ))) {
559589 authType = HttpConstants .OAUTH ;
@@ -676,6 +706,7 @@ protected int sendRequest(Object payload, DynamicOptions dynamicOptions, List<He
676706 clientConnector .getPublisherURL () + ", " + e + ". Message dropped." );
677707 }
678708 HttpCarbonMessage response = listener .getHttpResponseMessage ();
709+ log .info (" Response: *********** " + response .getNettyHttpResponse ().status ());
679710 return response .getNettyHttpResponse ().status ().code ();
680711 } else {
681712 HttpResponseFuture responseFuture = clientConnector .send (cMessage );
@@ -707,7 +738,14 @@ protected void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, L
707738 ClientConnector clientConnector )
708739 throws ConnectionUnavailableException {
709740 //generate encoded base64 auth for getting refresh token
710- String consumerKeyValue = consumerKey + ":" + consumerSecret ;
741+ String consumerKeyValue ;
742+
743+ if (!HttpConstants .EMPTY_STRING .equals (this .consumerKey )
744+ && !HttpConstants .EMPTY_STRING .equals (this .consumerSecret )) {
745+ consumerKeyValue = consumerKey + ":" + consumerSecret ;
746+ } else {
747+ consumerKeyValue = bodyConsumerKey + ":" + bodyConsumerSecret ;
748+ }
711749 String encodedAuth = "Basic " + encodeBase64 (consumerKeyValue )
712750 .replaceAll (HttpConstants .NEW_LINE , HttpConstants .EMPTY_STRING );
713751 //check the availability of access token in the header
@@ -858,14 +896,15 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
858896 void getAccessToken (DynamicOptions dynamicOptions , String encodedAuth , String tokenURL ) {
859897 this .tokenURL = tokenURL ;
860898 HttpsClient httpsClient = new HttpsClient ();
861- if (!HttpConstants .EMPTY_STRING .equals (oauthUsername ) &&
862- !HttpConstants .EMPTY_STRING .equals (oauthUserPassword )) {
863- httpsClient .getPasswordGrantAccessToken (tokenURL , clientStoreFile ,
864- clientStorePass , oauthUsername , oauthUserPassword , encodedAuth );
865- } else if (!HttpConstants .EMPTY_STRING .equals (refreshToken .getValue (dynamicOptions )) ||
899+ if (!HttpConstants .EMPTY_STRING .equals (refreshToken .getValue (dynamicOptions )) ||
866900 accessTokenCache .getRefreshtoken (encodedAuth ) != null ) {
867901 httpsClient .getRefreshGrantAccessToken (tokenURL , clientStoreFile ,
868- clientStorePass , encodedAuth , refreshToken .getValue (dynamicOptions ));
902+ clientStorePass , encodedAuth , refreshToken .getValue (dynamicOptions ), oauthUsername ,
903+ oauthUserPassword , bodyConsumerKey , bodyConsumerSecret , oauth2Scope );
904+ } else if (!HttpConstants .EMPTY_STRING .equals (oauthUsername ) &&
905+ !HttpConstants .EMPTY_STRING .equals (oauthUserPassword )) {
906+ httpsClient .getPasswordGrantAccessToken (tokenURL , clientStoreFile , clientStorePass , oauthUsername ,
907+ oauthUserPassword , encodedAuth , bodyConsumerKey , bodyConsumerSecret , oauth2Scope );
869908 } else {
870909 httpsClient .getClientGrantAccessToken (tokenURL , clientStoreFile ,
871910 clientStorePass , encodedAuth );
@@ -1062,7 +1101,8 @@ public ClientConnector createClientConnector(DynamicOptions dynamicOptions) {
10621101 publisherURL = publisherURLOption .getValue (dynamicOptions );
10631102 }
10641103 if (authType .equals (HttpConstants .OAUTH )) {
1065- if (EMPTY_STRING .equals (consumerSecret ) || EMPTY_STRING .equals (consumerKey )) {
1104+ if ((EMPTY_STRING .equals (consumerSecret ) || EMPTY_STRING .equals (consumerKey ))
1105+ && (EMPTY_STRING .equals (bodyConsumerKey ) || EMPTY_STRING .equals (bodyConsumerSecret ))) {
10661106 throw new SiddhiAppCreationException (HttpConstants .CONSUMER_KEY + " and " +
10671107 HttpConstants .CONSUMER_SECRET + " found empty but it is Mandatory field in " +
10681108 HttpConstants .HTTP_SINK_ID + " in " + streamID );
0 commit comments