Skip to content

Commit 7fc5127

Browse files
Added support for connecting to specific shards of a sharded database by
specifying a shard key.
1 parent 84612bf commit 7fc5127

File tree

6 files changed

+224
-27
lines changed

6 files changed

+224
-27
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ the modules in [cx_PyOracleLib][8].
155155

156156
- Database startup and shutdown.
157157

158+
- Sharded Databases
159+
158160
- Oracle Database High Availability Features, such as FAN notifications and Transaction Guard support.
159161

160162
**DB API specification exclusions**: The time data type is not

doc/src/module.rst

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ Module Interface
2222
This method is an extension to the DB API definition.
2323

2424

25-
.. function:: Connection([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword, encoding, nencoding, edition, appcontext, tag, matchanytag])
26-
connect([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword, encoding, nencoding, edition, appcontext, tag, matchanytag])
25+
.. function:: Connection([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword, encoding, nencoding, edition, appcontext, tag, matchanytag, shardingkey, supershardingkey])
26+
connect([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword, encoding, nencoding, edition, appcontext, tag, matchanytag, shardingkey, supershardingkey])
2727

2828
Constructor for creating a connection to the database. Return a
2929
:ref:`connection object <connobj>`. All arguments are optional and can be
@@ -95,6 +95,11 @@ Module Interface
9595
Sessions are tagged when they are :meth:`released <SessionPool.release>`
9696
back to the pool.
9797

98+
The shardingkey and supershardingkey arguments, if specified, are expected
99+
to be a sequence of values which will be used to identify the database
100+
shard to connect to. Currently only strings are supported for the key
101+
values.
102+
98103

99104
.. function:: Cursor(connection)
100105

doc/src/session_pool.rst

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ SessionPool Object
99
This object is an extension to the DB API.
1010

1111

12-
.. method:: SessionPool.acquire(user=None, password=None, cclass=None, purity=cx_Oracle.ATTR_PURITY_DEFAULT, tag=None, matchanytag=False)
12+
.. method:: SessionPool.acquire(user=None, password=None, cclass=None, purity=cx_Oracle.ATTR_PURITY_DEFAULT, tag=None, matchanytag=False, shardingkey=[], supershardingkey=[])
1313

1414
Acquire a connection from the session pool and return a
1515
:ref:`connection object <connobj>`.
@@ -33,6 +33,10 @@ SessionPool Object
3333
Sessions are tagged when they are :meth:`released <SessionPool.release>`
3434
back to the pool.
3535

36+
The shardingkey and supershardingkey arguments, if specified, are expected
37+
to be a sequence of values which will be used to identify the database
38+
shard to connect to. Currently only strings are supported for the key
39+
values.
3640

3741
.. attribute:: SessionPool.busy
3842

src/Connection.c

Lines changed: 204 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ static PyTypeObject g_ConnectionType = {
231231
// structure used to help in establishing a connection
232232
//-----------------------------------------------------------------------------
233233
typedef struct {
234+
const char *encoding;
234235
udt_Buffer userNameBuffer;
235236
udt_Buffer passwordBuffer;
236237
udt_Buffer newPasswordBuffer;
@@ -243,6 +244,12 @@ typedef struct {
243244
udt_Buffer *ctxNamespaceBuffers;
244245
udt_Buffer *ctxNameBuffers;
245246
udt_Buffer *ctxValueBuffers;
247+
dpiShardingKeyColumn *shardingKeyColumns;
248+
udt_Buffer *shardingKeyBuffers;
249+
uint32_t numShardingKeyColumns;
250+
dpiShardingKeyColumn *superShardingKeyColumns;
251+
uint32_t numSuperShardingKeyColumns;
252+
udt_Buffer *superShardingKeyBuffers;
246253
} udt_ConnectionParams;
247254

248255

@@ -264,6 +271,12 @@ static void ConnectionParams_Initialize(udt_ConnectionParams *params)
264271
params->ctxNamespaceBuffers = NULL;
265272
params->ctxNameBuffers = NULL;
266273
params->ctxValueBuffers = NULL;
274+
params->numShardingKeyColumns = 0;
275+
params->shardingKeyColumns = NULL;
276+
params->shardingKeyBuffers = NULL;
277+
params->numSuperShardingKeyColumns = 0;
278+
params->superShardingKeyColumns = NULL;
279+
params->superShardingKeyBuffers = NULL;
267280
}
268281

269282

@@ -274,7 +287,7 @@ static void ConnectionParams_Initialize(udt_ConnectionParams *params)
274287
// populates the parametrs with buffers for each of these.
275288
//-----------------------------------------------------------------------------
276289
static int ConnectionParams_ProcessContext(udt_ConnectionParams *params,
277-
PyObject *context, const char *encoding)
290+
PyObject *context)
278291
{
279292
uint32_t numEntries, i;
280293
dpiAppContext *entry;
@@ -322,13 +335,13 @@ static int ConnectionParams_ProcessContext(udt_ConnectionParams *params,
322335
return -1;
323336
}
324337
if (cxBuffer_FromObject(&params->ctxNamespaceBuffers[i],
325-
PyTuple_GET_ITEM(entryObj, 0), encoding) < 0)
338+
PyTuple_GET_ITEM(entryObj, 0), params->encoding) < 0)
326339
return -1;
327340
if (cxBuffer_FromObject(&params->ctxNameBuffers[i],
328-
PyTuple_GET_ITEM(entryObj, 1), encoding) < 0)
341+
PyTuple_GET_ITEM(entryObj, 1), params->encoding) < 0)
329342
return -1;
330343
if (cxBuffer_FromObject(&params->ctxValueBuffers[i],
331-
PyTuple_GET_ITEM(entryObj, 2), encoding) < 0)
344+
PyTuple_GET_ITEM(entryObj, 2), params->encoding) < 0)
332345
return -1;
333346
entry = &params->appContext[i];
334347
entry->namespaceName = params->ctxNamespaceBuffers[i].ptr;
@@ -343,6 +356,145 @@ static int ConnectionParams_ProcessContext(udt_ConnectionParams *params,
343356
}
344357

345358

359+
//-----------------------------------------------------------------------------
360+
// ConnectionParams_ProcessShardingKeyValue()
361+
// Process a single sharding key value.
362+
//-----------------------------------------------------------------------------
363+
static int ConnectionParams_ProcessShardingKeyValue(
364+
udt_ConnectionParams *params, PyObject *value,
365+
dpiShardingKeyColumn *column, udt_Buffer *buffer)
366+
{
367+
dpiTimestamp *timestamp;
368+
PyObject *textValue;
369+
370+
if (cxString_Check(value) || PyUnicode_Check(value) ||
371+
cxBinary_Check(value)) {
372+
if (cxBuffer_FromObject(buffer, value, params->encoding) < 0)
373+
return -1;
374+
if (cxBinary_Check(value))
375+
column->oracleTypeNum = DPI_ORACLE_TYPE_RAW;
376+
else column->oracleTypeNum = DPI_ORACLE_TYPE_VARCHAR;
377+
column->nativeTypeNum = DPI_NATIVE_TYPE_BYTES;
378+
column->value.asBytes.ptr = (char*) buffer->ptr;
379+
column->value.asBytes.length = buffer->size;
380+
} else if (PyLong_Check(value) || Py_TYPE(value) == g_DecimalType) {
381+
if (PyLong_Check(value)) {
382+
column->oracleTypeNum = DPI_ORACLE_TYPE_NUMBER;
383+
column->nativeTypeNum = DPI_NATIVE_TYPE_INT64;
384+
column->value.asInt64 = PyLong_AsLong(value);
385+
if (!PyErr_Occurred())
386+
return 0;
387+
PyErr_Clear();
388+
}
389+
textValue = PyObject_Str(value);
390+
if (!textValue)
391+
return -1;
392+
if (cxBuffer_FromObject(buffer, textValue, params->encoding) < 0) {
393+
Py_CLEAR(textValue);
394+
return -1;
395+
}
396+
Py_CLEAR(textValue);
397+
column->oracleTypeNum = DPI_ORACLE_TYPE_NUMBER;
398+
column->nativeTypeNum = DPI_NATIVE_TYPE_BYTES;
399+
column->value.asBytes.ptr = (char*) buffer->ptr;
400+
column->value.asBytes.length = buffer->size;
401+
#if PY_MAJOR_VERSION < 3
402+
} else if (PyInt_Check(value)) {
403+
column->oracleTypeNum = DPI_ORACLE_TYPE_NUMBER;
404+
column->nativeTypeNum = DPI_NATIVE_TYPE_INT64;
405+
column->value.asInt64 = PyInt_AS_LONG(value);
406+
#endif
407+
} else if (PyFloat_Check(value)) {
408+
column->oracleTypeNum = DPI_ORACLE_TYPE_NUMBER;
409+
column->nativeTypeNum = DPI_NATIVE_TYPE_DOUBLE;
410+
column->value.asDouble = PyFloat_AS_DOUBLE(value);
411+
} else if (PyDateTime_Check(value) || PyDate_Check(value)) {
412+
column->oracleTypeNum = DPI_ORACLE_TYPE_DATE;
413+
column->nativeTypeNum = DPI_NATIVE_TYPE_TIMESTAMP;
414+
timestamp = &column->value.asTimestamp;
415+
timestamp->year = PyDateTime_GET_YEAR(value);
416+
timestamp->month = PyDateTime_GET_MONTH(value);
417+
timestamp->day = PyDateTime_GET_DAY(value);
418+
if (PyDateTime_Check(value)) {
419+
timestamp->hour = PyDateTime_DATE_GET_HOUR(value);
420+
timestamp->minute = PyDateTime_DATE_GET_MINUTE(value);
421+
timestamp->second = PyDateTime_DATE_GET_SECOND(value);
422+
timestamp->fsecond = PyDateTime_DATE_GET_MICROSECOND(value) * 1000;
423+
} else {
424+
timestamp->hour = 0;
425+
timestamp->minute = 0;
426+
timestamp->second = 0;
427+
timestamp->fsecond = 0;
428+
}
429+
timestamp->tzHourOffset = 0;
430+
timestamp->tzMinuteOffset = 0;
431+
} else {
432+
PyErr_SetString(g_NotSupportedErrorException,
433+
"value not supported for sharding keys");
434+
return -1;
435+
}
436+
437+
return 0;
438+
}
439+
440+
441+
//-----------------------------------------------------------------------------
442+
// ConnectionParams_ProcessShardingKey()
443+
// Process either the sharding key or the super sharding key. A sharding key
444+
// is expected to be a sequence of values. A null value or a sequence of size
445+
// 0 is ignored.
446+
//-----------------------------------------------------------------------------
447+
static int ConnectionParams_ProcessShardingKey(udt_ConnectionParams *params,
448+
PyObject *shardingKeyObj, int isSuperShardingKey)
449+
{
450+
dpiShardingKeyColumn *columns;
451+
uint32_t i, numColumns;
452+
udt_Buffer *buffers;
453+
PyObject *value;
454+
455+
// validate sharding key
456+
if (!shardingKeyObj || shardingKeyObj == Py_None)
457+
return 0;
458+
if (!PySequence_Check(shardingKeyObj)) {
459+
PyErr_SetString(PyExc_TypeError, "expecting a sequence");
460+
return -1;
461+
}
462+
numColumns = (uint32_t) PySequence_Size(shardingKeyObj);
463+
if (numColumns == 0)
464+
return 0;
465+
466+
// allocate memory for the sharding key values
467+
columns = PyMem_Malloc(numColumns * sizeof(dpiShardingKeyColumn));
468+
buffers = PyMem_Malloc(numColumns * sizeof(udt_Buffer));
469+
if (isSuperShardingKey) {
470+
params->superShardingKeyColumns = columns;
471+
params->superShardingKeyBuffers = buffers;
472+
params->numSuperShardingKeyColumns = numColumns;
473+
} else {
474+
params->shardingKeyColumns = columns;
475+
params->shardingKeyBuffers = buffers;
476+
params->numShardingKeyColumns = numColumns;
477+
}
478+
if (!columns || !buffers) {
479+
PyErr_NoMemory();
480+
return -1;
481+
}
482+
483+
// process each value
484+
for (i = 0; i < numColumns; i++) {
485+
cxBuffer_Init(&buffers[i]);
486+
value = PySequence_GetItem(shardingKeyObj, i);
487+
if (!value)
488+
return -1;
489+
if (ConnectionParams_ProcessShardingKeyValue(params, value,
490+
&columns[i], &buffers[i]) < 0)
491+
return -1;
492+
}
493+
494+
return 0;
495+
}
496+
497+
346498
//-----------------------------------------------------------------------------
347499
// ConnectionParams_Finalize()
348500
// Finalize the parameters, freeing any resources that were allocated. The
@@ -381,6 +533,26 @@ static int ConnectionParams_Finalize(udt_ConnectionParams *params)
381533
PyMem_Free(params->ctxValueBuffers);
382534
params->ctxValueBuffers = NULL;
383535
}
536+
for (i = 0; i < params->numShardingKeyColumns; i++)
537+
cxBuffer_Clear(&params->shardingKeyBuffers[i]);
538+
if (params->shardingKeyColumns) {
539+
PyMem_Free(params->shardingKeyColumns);
540+
params->shardingKeyColumns = NULL;
541+
}
542+
if (params->shardingKeyBuffers) {
543+
PyMem_Free(params->shardingKeyBuffers);
544+
params->shardingKeyBuffers = NULL;
545+
}
546+
for (i = 0; i < params->numSuperShardingKeyColumns; i++)
547+
cxBuffer_Clear(&params->superShardingKeyBuffers[i]);
548+
if (params->superShardingKeyColumns) {
549+
PyMem_Free(params->superShardingKeyColumns);
550+
params->superShardingKeyColumns = NULL;
551+
}
552+
if (params->superShardingKeyBuffers) {
553+
PyMem_Free(params->superShardingKeyBuffers);
554+
params->superShardingKeyBuffers = NULL;
555+
}
384556
return -1;
385557
}
386558

@@ -564,25 +736,25 @@ static int Connection_Init(udt_Connection *self, PyObject *args,
564736
{
565737
PyObject *tagObj, *matchAnyTagObj, *threadedObj, *eventsObj, *contextObj;
566738
PyObject *usernameObj, *passwordObj, *dsnObj, *cclassObj, *editionObj;
739+
PyObject *shardingKeyObj, *superShardingKeyObj;
567740
dpiCommonCreateParams dpiCommonParams;
568741
dpiConnCreateParams dpiCreateParams;
569742
udt_ConnectionParams params;
570743
PyObject *newPasswordObj;
571744
udt_SessionPool *pool;
572-
const char *encoding;
573745
int status, temp;
574746

575747
// define keyword arguments
576748
static char *keywordList[] = { "user", "password", "dsn", "mode",
577749
"handle", "pool", "threaded", "events", "cclass", "purity",
578750
"newpassword", "encoding", "nencoding", "edition", "appcontext",
579-
"tag", "matchanytag", NULL };
751+
"tag", "matchanytag", "shardingkey", "supershardingkey", NULL };
580752

581753
// parse arguments
582754
pool = NULL;
583755
threadedObj = eventsObj = newPasswordObj = usernameObj = NULL;
584756
passwordObj = dsnObj = cclassObj = editionObj = tagObj = NULL;
585-
matchAnyTagObj = contextObj = NULL;
757+
matchAnyTagObj = contextObj = shardingKeyObj = superShardingKeyObj = NULL;
586758
if (InitializeDPI() < 0)
587759
return -1;
588760
if (dpiContext_initCommonCreateParams(g_DpiContext, &dpiCommonParams) < 0)
@@ -593,13 +765,13 @@ static int Connection_Init(udt_Connection *self, PyObject *args,
593765
if (dpiContext_initConnCreateParams(g_DpiContext, &dpiCreateParams) < 0)
594766
return Error_RaiseAndReturnInt();
595767
if (!PyArg_ParseTupleAndKeywords(args, keywordArgs,
596-
"|OOOikO!OOOiOssOOOO", keywordList, &usernameObj, &passwordObj,
768+
"|OOOikO!OOOiOssOOOOOO", keywordList, &usernameObj, &passwordObj,
597769
&dsnObj, &dpiCreateParams.authMode,
598770
&dpiCreateParams.externalHandle, &g_SessionPoolType, &pool,
599771
&threadedObj, &eventsObj, &cclassObj, &dpiCreateParams.purity,
600772
&newPasswordObj, &dpiCommonParams.encoding,
601773
&dpiCommonParams.nencoding, &editionObj, &contextObj, &tagObj,
602-
&matchAnyTagObj))
774+
&matchAnyTagObj, &shardingKeyObj, &superShardingKeyObj))
603775
return -1;
604776
if (GetBooleanValue(threadedObj, 0, &temp) < 0)
605777
return -1;
@@ -625,25 +797,32 @@ static int Connection_Init(udt_Connection *self, PyObject *args,
625797
return -1;
626798

627799
// setup parameters
800+
ConnectionParams_Initialize(&params);
628801
if (pool) {
629802
dpiCreateParams.pool = pool->handle;
630-
encoding = pool->encodingInfo.encoding;
631-
} else encoding = GetAdjustedEncoding(dpiCommonParams.encoding);
632-
ConnectionParams_Initialize(&params);
633-
if (ConnectionParams_ProcessContext(&params, contextObj, encoding) < 0)
803+
params.encoding = pool->encodingInfo.encoding;
804+
} else params.encoding = GetAdjustedEncoding(dpiCommonParams.encoding);
805+
if (ConnectionParams_ProcessContext(&params, contextObj) < 0)
806+
return ConnectionParams_Finalize(&params);
807+
if (ConnectionParams_ProcessShardingKey(&params, shardingKeyObj, 0) < 0)
808+
return ConnectionParams_Finalize(&params);
809+
if (ConnectionParams_ProcessShardingKey(&params, superShardingKeyObj,
810+
1) < 0)
634811
return ConnectionParams_Finalize(&params);
635812
if (cxBuffer_FromObject(&params.userNameBuffer, self->username,
636-
encoding) < 0 ||
813+
params.encoding) < 0 ||
637814
cxBuffer_FromObject(&params.passwordBuffer, passwordObj,
638-
encoding) < 0 ||
639-
cxBuffer_FromObject(&params.dsnBuffer, self->dsn, encoding) < 0 ||
815+
params.encoding) < 0 ||
816+
cxBuffer_FromObject(&params.dsnBuffer, self->dsn,
817+
params.encoding) < 0 ||
640818
cxBuffer_FromObject(&params.connectionClassBuffer, cclassObj,
641-
encoding) < 0 ||
819+
params.encoding) < 0 ||
642820
cxBuffer_FromObject(&params.newPasswordBuffer, newPasswordObj,
643-
encoding) < 0 ||
821+
params.encoding) < 0 ||
644822
cxBuffer_FromObject(&params.editionBuffer, editionObj,
645-
encoding) < 0 ||
646-
cxBuffer_FromObject(&params.tagBuffer, tagObj, encoding) < 0)
823+
params.encoding) < 0 ||
824+
cxBuffer_FromObject(&params.tagBuffer, tagObj,
825+
params.encoding) < 0)
647826
return ConnectionParams_Finalize(&params);
648827
if (params.userNameBuffer.size == 0 && params.passwordBuffer.size == 0)
649828
dpiCreateParams.externalAuth = 1;
@@ -657,6 +836,11 @@ static int Connection_Init(udt_Connection *self, PyObject *args,
657836
dpiCreateParams.tagLength = params.tagBuffer.size;
658837
dpiCreateParams.appContext = params.appContext;
659838
dpiCreateParams.numAppContext = params.numAppContext;
839+
dpiCreateParams.shardingKeyColumns = params.shardingKeyColumns;
840+
dpiCreateParams.numShardingKeyColumns = params.numShardingKeyColumns;
841+
dpiCreateParams.superShardingKeyColumns = params.superShardingKeyColumns;
842+
dpiCreateParams.numSuperShardingKeyColumns =
843+
params.numSuperShardingKeyColumns;
660844
if (pool && !pool->homogeneous && pool->username && self->username) {
661845
temp = PyObject_RichCompareBool(self->username, pool->username, Py_EQ);
662846
if (temp < 0)

0 commit comments

Comments
 (0)