Skip to content

Commit 2112982

Browse files
Added support for receiving notifications when AQ messages are available to be
dequeued.
1 parent 6583cdf commit 2112982

File tree

10 files changed

+280
-165
lines changed

10 files changed

+280
-165
lines changed

doc/src/connection.rst

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -480,11 +480,18 @@ Connection Object
480480
This attribute is an extension to the DB API definition.
481481

482482

483-
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY)
483+
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY, name=None)
484484

485-
Return a new :ref:`subscription object <subscrobj>` using the connection.
486-
Currently the namespace and protocol parameters cannot have any other
487-
meaningful values.
485+
Return a new :ref:`subscription object <subscrobj>` that receives
486+
notifications for events that take place in the database that match the
487+
given parameters.
488+
489+
The namespace parameter specifies the namespace the subscription uses. It
490+
can be one of :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` or
491+
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.
492+
493+
The protocol parameter specifies the protocol to use when notifications are
494+
sent. Currently the only valid value is :data:`cx_Oracle.SUBSCR_PROTO_OCI`.
488495

489496
The callback is expected to be a callable that accepts a single parameter.
490497
A :ref:`message object <msgobjects>` is passed to this callback whenever a
@@ -496,11 +503,12 @@ Connection Object
496503

497504
The operations parameter enables filtering of the messages that are sent
498505
(insert, update, delete). The default value will send notifications for all
499-
operations.
506+
operations. This parameter is only used when the namespace is set to
507+
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE`.
500508

501509
The port parameter specifies the listening port for callback notifications
502510
from the database server. If not specified, an unused port will be selected
503-
by the database.
511+
by the Oracle Client libraries.
504512

505513
The qos parameter specifies quality of service options. It should be one or
506514
more of the following flags, OR'ed together:
@@ -510,9 +518,10 @@ Connection Object
510518
:data:`cx_Oracle.SUBSCR_QOS_QUERY`,
511519
:data:`cx_Oracle.SUBSCR_QOS_BEST_EFFORT`.
512520

513-
The ipAddress parameter specifies the IP address (IPv4 or IPv6) to bind for
514-
callback notifications from the database server. If not specified, the
515-
client IP address will be determined by the Oracle Client libraries.
521+
The ipAddress parameter specifies the IP address (IPv4 or IPv6) in standard
522+
string notation to bind for callback notifications from the database
523+
server. If not specified, the client IP address will be determined by the
524+
Oracle Client libraries.
516525

517526
The groupingClass parameter specifies what type of grouping of
518527
notifications should take place. Currently, if set, this value can only be
@@ -522,18 +531,30 @@ Connection Object
522531
values :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY` (the default) or
523532
:data:`cx_Oracle.SUBSCR_GROUPING_TYPE_LAST`.
524533

534+
The name parameter is used to identify the subscription and is specific to
535+
the selected namespace. If the namespace parameter is
536+
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` then the name is optional and
537+
can be any value. If the namespace parameter is
538+
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`, however, the name must be in the
539+
format '<QUEUE_NAME>' for single consumer queues and
540+
'<QUEUE_NAME>:<CONSUMER_NAME>' for multiple consumer queues, and identifies
541+
the queue that will be monitored for messages. The queue name may include
542+
the schema, if needed.
543+
525544
*New in version 6.4:* The parameters ipAddress, groupingClass,
526-
groupingValue and groupingType were added.
545+
groupingValue, groupingType and name were added.
527546

528547
.. note::
529548

530549
This method is an extension to the DB API definition.
531550

532551
.. note::
533552

534-
Do not close the connection before the subscription object is deleted
535-
or the subscription object will not be deregistered in the database.
536-
This is done automatically if connection.close() is never called.
553+
The subscription can be deregistered in the database by calling the
554+
function :meth:`~Connection.unsubscribe()`. If this method is not
555+
called and the connection that was used to create the subscription is
556+
explictly closed using the function :meth:`~Connection.close()`, the
557+
subscription will not be deregistered in the database.
537558

538559

539560
.. attribute:: Connection.tnsentry
@@ -546,6 +567,16 @@ Connection Object
546567
This attribute is an extension to the DB API definition.
547568

548569

570+
.. method:: Connection.unsubscribe(subscr)
571+
572+
Unsubscribe from events in the database that were originally subscribed to
573+
using :meth:`~Connection.subscribe()`. The connection used to unsubscribe
574+
should be the same one used to create the subscription, or should access
575+
the same database and be connected as the same user name.
576+
577+
.. versionadded:: 6.4
578+
579+
549580
.. attribute:: Connection.username
550581

551582
This read-only attribute returns the name of the user which established the

doc/src/module.rst

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,12 @@ values for the :attr:`Message.type` attribute of the messages that are sent
651651
for subscriptions created by the :meth:`Connection.subscribe()` method.
652652

653653

654+
.. data:: EVENT_AQ
655+
656+
This constant is used to specify that one or more messages are available
657+
for dequeuing on the queue specified when the subscription was created.
658+
659+
654660
.. data:: EVENT_DEREG
655661

656662
This constant is used to specify that the subscription has been
@@ -847,12 +853,15 @@ These constants are extensions to the DB API definition. They are possible
847853
values for the namespace parameter of the :meth:`Connection.subscribe()`
848854
method.
849855

856+
.. data:: SUBSCR_NAMESPACE_AQ
857+
858+
This constant is used to specify that notifications should be sent when a
859+
queue has messages available to dequeue.
850860

851861
.. data:: SUBSCR_NAMESPACE_DBCHANGE
852862

853863
This constant is used to specify that database change notification or query
854-
change notification messages are to be sent. This is the default value and
855-
currently the only value that is supported.
864+
change notification messages are to be sent. This is the default value.
856865

857866

858867
Subscription Protocols

doc/src/subscription.rst

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,23 @@ Subscription Object
3030
This attribute was never intended to be exposed.
3131

3232

33+
.. attribute:: Subscription.ipAddress
34+
35+
This read-only attribute returns the IP address used for callback
36+
notifications from the database server. If not set during construction,
37+
this value is None.
38+
39+
.. versionadded:: 6.4
40+
41+
42+
.. attribute:: Subscription.name
43+
44+
This read-only attribute returns the name used to register the subscription
45+
when it was created.
46+
47+
.. versionadded:: 6.4
48+
49+
3350
.. attribute:: Subscription.namespace
3451

3552
This read-only attribute returns the namespace used to register the
@@ -43,13 +60,6 @@ Subscription Object
4360
subscription.
4461

4562

46-
.. attribute:: Subscription.ipAddress
47-
48-
This read-only attribute returns the IP address used for callback
49-
notifications from the database server. If not set during construction,
50-
this value is None.
51-
52-
5363
.. attribute:: Subscription.port
5464

5565
This read-only attribute returns the port used for callback notifications
@@ -103,12 +113,6 @@ Message Objects
103113
the notification.
104114

105115

106-
.. attribute:: Message.txid
107-
108-
This read-only attribute returns the id of the transaction that generated
109-
the notification.
110-
111-
112116
.. attribute:: Message.queries
113117

114118
This read-only attribute returns a list of message query objects that give
@@ -117,6 +121,25 @@ Message Objects
117121
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.
118122

119123

124+
.. attribute:: Message.queueName
125+
126+
This read-only attribute returns the name of the queue which generated the
127+
notification. It will only be populated if the subscription was created
128+
with the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.
129+
130+
.. versionadded:: 6.4
131+
132+
133+
.. attribute:: Message.consumerName
134+
135+
This read-only attribute returns the name of the consumer which generated
136+
the notification. It will be populated if the subscription was created with
137+
the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ` and the queue is a
138+
multiple consumer queue.
139+
140+
.. versionadded:: 6.4
141+
142+
120143
.. attribute:: Message.subscription
121144

122145
This read-only attribute returns the subscription object for which this
@@ -131,6 +154,12 @@ Message Objects
131154
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.
132155

133156

157+
.. attribute:: Message.txid
158+
159+
This read-only attribute returns the id of the transaction that generated
160+
the notification.
161+
162+
134163
.. attribute:: Message.type
135164

136165
This read-only attribute returns the type of message that has been sent.

samples/AdvancedQueuing.py

Lines changed: 12 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#------------------------------------------------------------------------------
2-
# Copyright 2016, 2017, Oracle and/or its affiliates. All rights reserved.
2+
# Copyright 2016, 2018, Oracle and/or its affiliates. All rights reserved.
33
#
44
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
55
#
@@ -10,7 +10,7 @@
1010
#------------------------------------------------------------------------------
1111
# AdvancedQueuing.py
1212
# This script demonstrates how to use advanced queuing using cx_Oracle. It
13-
# creates a simple type and enqueues and dequeues a few objects.
13+
# makes use of a simple type and queue created in the sample setup.
1414
#
1515
# This script requires cx_Oracle 5.3 and higher.
1616
#------------------------------------------------------------------------------
@@ -29,44 +29,17 @@
2929
connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING)
3030
cursor = connection.cursor()
3131

32-
# drop queue table, if present
33-
cursor.execute("""
34-
select count(*)
35-
from user_tables
36-
where table_name = :name""", name = QUEUE_TABLE_NAME)
37-
count, = cursor.fetchone()
38-
if count > 0:
39-
print("Dropping queue table...")
40-
cursor.callproc("dbms_aqadm.drop_queue_table", (QUEUE_TABLE_NAME, True))
41-
42-
# drop type, if present
43-
cursor.execute("""
44-
select count(*)
45-
from user_types
46-
where type_name = :name""", name = BOOK_TYPE_NAME)
47-
count, = cursor.fetchone()
48-
if count > 0:
49-
print("Dropping books type...")
50-
cursor.execute("drop type %s" % BOOK_TYPE_NAME)
51-
52-
# create type
53-
print("Creating books type...")
54-
cursor.execute("""
55-
create type %s as object (
56-
title varchar2(100),
57-
authors varchar2(100),
58-
price number(5,2)
59-
);""" % BOOK_TYPE_NAME)
60-
61-
# create queue table and quueue and start the queue
62-
print("Creating queue table...")
63-
cursor.callproc("dbms_aqadm.create_queue_table",
64-
(QUEUE_TABLE_NAME, BOOK_TYPE_NAME))
65-
cursor.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME))
66-
cursor.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,))
32+
# dequeue all existing messages to ensure the queue is empty, just so that
33+
# the results are consistent
34+
booksType = connection.gettype(BOOK_TYPE_NAME)
35+
book = booksType.newobject()
36+
options = connection.deqoptions()
37+
options.wait = cx_Oracle.DEQ_NO_WAIT
38+
messageProperties = connection.msgproperties()
39+
while connection.deq(QUEUE_NAME, options, messageProperties, book):
40+
pass
6741

6842
# enqueue a few messages
69-
booksType = connection.gettype(BOOK_TYPE_NAME)
7043
book1 = booksType.newobject()
7144
book1.TITLE = "The Fellowship of the Ring"
7245
book1.AUTHORS = "Tolkien, J.R.R."
@@ -76,7 +49,6 @@
7649
book2.AUTHORS = "Rowling, J.K."
7750
book2.PRICE = decimal.Decimal("7.99")
7851
options = connection.enqoptions()
79-
messageProperties = connection.msgproperties()
8052
for book in (book1, book2):
8153
print("Enqueuing book", book.TITLE)
8254
connection.enq(QUEUE_NAME, options, messageProperties, book)
@@ -88,4 +60,5 @@
8860
options.wait = cx_Oracle.DEQ_NO_WAIT
8961
while connection.deq(QUEUE_NAME, options, messageProperties, book):
9062
print("Dequeued book", book.TITLE)
63+
connection.commit()
9164

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#------------------------------------------------------------------------------
2+
# Copyright 2018, Oracle and/or its affiliates. All rights reserved.
3+
#------------------------------------------------------------------------------
4+
5+
#------------------------------------------------------------------------------
6+
# AdvancedQueuingNotification.py
7+
# This script demonstrates using advanced queuing notification. Once this
8+
# script is running, use another session to enqueue a few messages to the
9+
# "BOOKS" queue. This is most easily accomplished by running the
10+
# AdvancedQueuing sample.
11+
#
12+
# This script requires cx_Oracle 6.4 and higher.
13+
#------------------------------------------------------------------------------
14+
15+
from __future__ import print_function
16+
17+
import cx_Oracle
18+
import SampleEnv
19+
import threading
20+
import time
21+
22+
registered = True
23+
24+
def callback(message):
25+
global registered
26+
print("Message type:", message.type)
27+
if message.type == cx_Oracle.EVENT_DEREG:
28+
print("Deregistration has taken place...")
29+
registered = False
30+
return
31+
print("Queue name:", message.queueName)
32+
print("Consumer name:", message.consumerName)
33+
34+
connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING, events = True)
35+
sub = connection.subscribe(namespace = cx_Oracle.SUBSCR_NAMESPACE_AQ,
36+
name = "BOOKS", callback = callback, timeout = 300)
37+
print("Subscription:", sub)
38+
print("--> Connection:", sub.connection)
39+
print("--> Callback:", sub.callback)
40+
print("--> Namespace:", sub.namespace)
41+
print("--> Protocol:", sub.protocol)
42+
print("--> Timeout:", sub.timeout)
43+
44+
while registered:
45+
print("Waiting for notifications....")
46+
time.sleep(5)
47+

samples/sql/SetupSamples.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@ create or replace type &main_user..udt_Building as object (
6868
);
6969
/
7070

71+
create or replace type &main_user..udt_Book as object (
72+
Title varchar2(100),
73+
Authors varchar2(100),
74+
Price number(5,2)
75+
);
76+
/
77+
7178
-- create tables
7279

7380
create table &main_user..TestNumbers (
@@ -147,6 +154,15 @@ create table &main_user..Ptab (
147154
mydata varchar(20)
148155
);
149156

157+
-- create queue table and queues for demonstrating advanced queuing
158+
begin
159+
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
160+
'&main_user..UDT_BOOK');
161+
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
162+
dbms_aqadm.start_queue('&main_user..BOOKS');
163+
end;
164+
/
165+
150166
-- populate tables
151167

152168
begin

0 commit comments

Comments
 (0)