Thursday, September 11, 2014

Configuring Oracle streams replication

 Oracle streams can be considered as an alternative to GoldenGate for configuring data replication. This can save you considerable amount of downtime for huge database migrations.You can create a new database in advance and configure replication using oracle streams even for cross platform migrations. Once both databases are in sync, downtime can be coordinated with business for re-pointing the applications. Here are the high level steps for configuring oracle streams:

1.) Setup STREAMS user in both source and target database with required permissions. In our case source database is STRM_SRC and target database is STRM_TRG.

create table STREAMTST.STREAM_TST as select 1 dummy from dual;

create tablespace STREAMS_TBS
datafile '/oracle/data/ora04/STRM_TRG/streams_tbs_01.dbf' size 500M
LOGGING
ONLINE
EXTENT MANAGEMENT LOCAL AUTOALLOCATE
BLOCKSIZE 8K
SEGMENT SPACE MANAGEMENT AUTO
FLASHBACK ON;

CREATE USER STRMADMIN IDENTIFIED BY Summerend2014
DEFAULT TABLESPACE STREAMS_TBS TEMPORARY TABLESPACE TEMP_T01;

alter user STRMADMIN quota unlimited on STREAMS_TBS;

alter user STRMADMIN default role all;

grant appl_connect to STRMADMIN;

grant dba to STRMADMIN;

EXEC DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('STRMADMIN');

CREATE DIRECTORY STRMDIR AS '/exports/pmxexpdp';

BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
GRANTEE => 'STRMADMIN',
FILE_NAME =>'streams_admin_privs.sql',
DIRECTORY_NAME => 'STRMDIR');
END;
/

2.) Setup database link and queue table in source database STRM_SRC after login as STRADMIN user -

CREATE DATABASE LINK STRM_TRG CONNECT TO STRMADMIN IDENTIFIED BY Summerend2014  USING 'STRM_TRG';

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(QUEUE_NAME=>'STREAMS_QUEUE',
QUEUE_TABLE=>'STREAMS_QUEUE_TABLE',
QUEUE_USER=>'STRMADMIN',
STORAGE_CLAUSE=>'TABLESPACE STREAMS_TBS');
END;
/


3.) Setup rules to capture transaction from redo log for particular schema -

BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_RULES (
SCHEMA_NAME =>'STREAMTST',
STREAMS_TYPE =>'CAPTURE',
STREAMS_NAME=>'STRMADMIN_CAPTURE',
QUEUE_NAME=>'STRMADMIN.STREAMS_QUEUE',
INCLUDE_DML=>TRUE,
INCLUDE_DDL=>TRUE);
END;
/


4.) Add propagation rules using the Oracle-provided PL/SQL procedure
DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES:

BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES (
SCHEMA_NAME=>'STREAMTST',
STREAMS_NAME=>'STRMADMIN_PROPAGATE',
SOURCE_QUEUE_NAME=>'STRMADMIN.STREAMS_QUEUE',
DESTINATION_QUEUE_NAME=> 'STRMADMIN.STREAMS_QUEUE@STRM_TRG',
SOURCE_DATABASE=>'STRM_SRC',
INCLUDE_DML=>TRUE,
INCLUDE_DDL=>TRUE,
QUEUE_TO_QUEUE=>TRUE);
END;



5.) Login to target database STRM_TRG as the Streams administrator, create the Streams capture queue:

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(QUEUE_NAME=>'STREAMS_QUEUE',
QUEUE_TABLE=>'STREAMS_QUEUE_TABLE',
QUEUE_USER=>'STRMADMIN',
STORAGE_CLAUSE=>'TABLESPACE STREAMS_TBS');
END;
/

6.) Connect as the Streams administrator, and add the schema apply
rules:

BEGIN
DBMS_STREAMS_ADM.ADD_SCHEMA_RULES (
SCHEMA_NAME =>'STREAMTST',
STREAMS_TYPE =>'APPLY',
STREAMS_NAME=>'STRMADMIN_APPLY',
QUEUE_NAME=>'STRMADMIN.STREAMS_QUEUE',
INCLUDE_DML=>TRUE,
INCLUDE_DDL=>TRUE);
END;
/

7.) Determine and obtain the current SCN from the source database.
To ensure that no data is lost, this step is to be performed before
the target copy is made. This SCN will be used as a starting point
for the capture process to begin the streaming activity.

set numwidth 18
SELECT TABLE_OWNER, TABLE_NAME, SCN from DBA_CAPTURE_PREPARED_TABLES;

8. After the data has been propagated, it needs to be applied to the
appropriate schema. Identify the user in the target database that will
apply the changes
EXEC DBMS_APPLY_ADM.ALTER_APPLY(apply_name=>'STRMADMIN_APPLY',apply_user=>'STRMADMIN');

9.) Set the SCN on the target database to allow the apply process to start the schema and table level instantiation at this SCN.
BEGIN
DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN (
source_schema_name=> 'STREAMTST',
source_database_name=>'STRM_SRC',
instantiation_scn=> 12115252424484);
END;
/

BEGIN DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name=> 'STREAMTST.STREAM_TST',
source_database_name=>'STRM_SRC',
instantiation_SCN=>12115252424484);
END;

10.) Start the apply process, using the following PL/SQL package:
 EXEC DBMS_APPLY_ADM.START_APPLY( apply_name => 'STRMADMIN_APPLY');

11.) Enable propagation schedule at the source database. Connect as the Streams administrator and execute the following PL/SQL
procedure:
 EXEC DBMS_PROPAGATION_ADM.START_PROPAGATION('STRMADMIN_PROPAGATE') ;

12.) The following procedure starts the capture process, mines redologs, and enqueues the mined redo information into the associated
queue:

 EXEC DBMS_CAPTURE_ADM.START_CAPTURE(capture_name=>'STRMADMIN_CAPTURE');


Now test if the changes are being replicated or not. First perform transactions in source database:

STRM_SRC> truncate table STREAMTST.stream_tst;

Table truncated.

STRM_SRC>
STRM_SRC> create table STREAMTST.stream_tst01 as select 1 dummy from dual;

Table created.


STRM_SRC> insert into STREAMTST.stream_tst (select * from STREAMTST.stream_tst);

0 rows created.

STRM_SRC> insert into STREAMTST.stream_tst01 (select * from STREAMTST.stream_tst01);

1 row created.

STRM_SRC> /

2 rows created.

STRM_SRC> /

4 rows created.

STRM_SRC> /

8 rows created.

STRM_SRC> /

16 rows created.

STRM_SRC> /

32 rows created.

STRM_SRC> commit;

Commit complete.

STRM_SRC> insert into STREAMTST.stream_tst01 (select * from STREAMTST.stream_tst01);

64 rows created.

STRM_SRC> /

128 rows created.

STRM_SRC> /

256 rows created.

STRM_SRC> /

512 rows created.

STRM_SRC> commit;

Commit complete.

STRM_SRC> select count(1) from STREAMTST.stream_tst01;

  COUNT(1)
----------
      1024

STRM_SRC> select count(1) from STREAMTST.stream_tst;

  COUNT(1)
----------
         0


Now check in target database :
SQL> select count(1) from STREAMTST.STREAM_TST;

  COUNT(1)
----------
      8192

SQL> /

  COUNT(1)
----------
         0

SQL> /

  COUNT(1)
----------
         0

SQL> select count(1) from STREAMTST.STREAM_TST01;

  COUNT(1)
----------
         1

SQL>
SQL> /

  COUNT(1)
----------
        64

SQL> /

  COUNT(1)
----------
      1024

SQL>

Here are some useful monitoring scripts which can be used for day today streams troubleshooting:
If you want to check on the source database which rules are implemented for Oracle Streams replication you can check the following dictionary data views:

select * from dba_streams_rules
select * from  dba_streams_schema_rules
select * from dba_streams_table_rules

--Check the state of the apply process and make sure it is enabled:
SELECT apply_name, apply_captured, status, rule_set_name
FROM DBA_APPLY;


--Verify that the apply process has not dequeued any events:
SELECT  APPLY_NAME, STATE,  TOTAL_MESSAGES_DEQUEUED
FROM V$STREAMS_APPLY_READER;


--Verify the destination queue is correct
SELECT PROPAGATION_NAME, DESTINATION_QUEUE_OWNER ||'.'||
DESTINATION_QUEUE_NAME ||'@'||  DESTINATION_DBLINK "Destination Queue" ,ERROR_MESSAGE,ERROR_DATE
FROM DBA_PROPAGATION AAA;


--Check the propagation schedule time and check for failures
SELECT TO_CHAR(s.START_DATE, 'HH24:MI:SS MM/DD/YY') START_DATE,
s.PROPAGATION_WINDOW, s.NEXT_TIME,  s.LATENCY,
DECODE(s.SCHEDULE_DISABLED,'Y','Disabled','N','Enabled') SCHEDULE_DISABLED,
s.PROCESS_NAME, s.FAILURES
FROM DBA_QUEUE_SCHEDULES s, DBA_PROPAGATION p
WHERE p.DESTINATION_DBLINK = s.DESTINATION
AND s.SCHEMA = p.SOURCE_QUEUE_OWNER
AND s.QNAME = p.SOURCE_QUEUE_NAME;


--Query the total number of events propagated:
SELECT SCHEMA, QNAME, DESTINATION, TOTAL_NUMBER
FROM DBA_QUEUE_SCHEDULES;

 --Check the proper rules are created for global, schema and table and at each level (capture, propagation, apply):
SELECT rule_name,rule_set_rule_enabled
FROM dba_rule_set_rules
WHERE rule_set_name = ''
and rule_name = '';

SELECT rule_name,rule_condition
FROM dba_rules
WHERE rule_name = '';

-- Verify that the apply process has not dequeued any events:
SELECT  APPLY_NAME, STATE,  TOTAL_MESSAGES_DEQUEUED
FROM V$STREAMS_APPLY_READER;

--Query the total number of events propagated:
SELECT SCHEMA, QNAME, DESTINATION, TOTAL_NUMBER
FROM DBA_QUEUE_SCHEDULES;

--Check the apply process latency:
SELECT (hwm_time-hwm_message_create_time)* 86400 "Latency in Seconds",
hwm_message_create_time "Event Creation",
hwm_time "Apply Time",
hwm_message_number "Applied Message #"
FROM v$STREAMS_APPLY_COORDINATOR;

--Check the Error Queue:
SELECT apply_name, source_database,local_transaction_id, error_message
FROM DBA_APPLY_ERROR;

--Check for any defined conflict resolution:
COLUMN object_owner      FORMAT a15
COLUMN object_name       FORMAT a15
COLUMN method_name       FORMAT a20
COLUMN column_name       FORMAT a15
COLUMN resolution_column FORMAT a60
SELECT object_owner,object_name,method_name,resolution_column,column_name
FROM dba_apply_conflict_columns;

--Check object instantiated SCN:
SELECT * FROM dba_apply_instantiated_objects;

EXEC DBMS_APPLY_ADM.SET_KEY_COLUMNS('<owner>.<table_name>','<col1>,<col2>,....');
EXEC DBMS_APPLY_ADM.EXECUTE_ALL_ERRORS;
EXEC DBMS_APPLY_ADM.START_APPLY('<apply name>');


--- stream capture status
select
capture_name,
capture_message_number,
enqueue_message_number,
state,
total_prefilter_discarded,
total_prefilter_kept,
total_messages_captured,
total_messages_enqueued
from gv$streams_capture;

set pagesize 1000
col first_scn format 999999999999999999
col next_scn format 999999999999999999
alter session set nls_date_format='dd-mon-yyyy hh24:mi:ss';
select source_database,thread#,sequence#,name,modified_time,first_scn,next_scn,dictionary_begin,dictionary_end
 from dba_registered_archived_log where &scn between first_scn and next_scn;

--stream apply status - corelated with the above query
select apply_name,state,
total_messages_dequeued,
total_messages_spilled,
dequeued_message_number
from gv$streams_apply_reader@orclstream;

------- *************** streams monitor script ***********************
COLUMN CAPTURE_NAME      FORMAT a15
COLUMN QUEUE_NAME       FORMAT a15
COLUMN PROPAGATION_NAME       FORMAT a15
COLUMN STATUS       FORMAT a15
COLUMN apply_name       FORMAT a15
COLUMN error_message FORMAT a35

prompt Redo Log Scanning Latency for Each Capture Process
SELECT CAPTURE_NAME,
((SYSDATE - CAPTURE_MESSAGE_CREATE_TIME)*86400) LATENCY_SECONDS,
((SYSDATE - CAPTURE_TIME)*86400) LAST_STATUS,
TO_CHAR(CAPTURE_TIME, 'HH24:MI:SS MM/DD/YY') CAPTURE_TIME,
TO_CHAR(CAPTURE_MESSAGE_CREATE_TIME, 'HH24:MI:SS MM/DD/YY') CREATE_TIME
FROM V$STREAMS_CAPTURE;

prompt "Capture process that are disabled"
SELECT CAPTURE_NAME, QUEUE_NAME, RULE_SET_NAME, NEGATIVE_RULE_SET_NAME, STATUS
FROM DBA_CAPTURE where capture_name='' and status<>'ENABLED';
prompt "Stream capture process that are not runing normally (state is not ok)"
select state from V$STREAMS_CAPTURE where
 state in ('SHUTTING DOWN','INITIALIZING','ABORTING','PAUSED FOR FLOW
CONTROL','DICTIONARY INITIALIZATION');

prompt "if a subscriber has a high count
for total_spilled_msg, then that subscriber is not dequeuing messages
fast enough from the queue buffer. Spilling messages to disk has a
negative impact on the performance of your Streams environment."

--prompt "check scn"
 --select * from DBA_APPLY_INSTANTIATED_OBJECTS@ORCLSTREAM;

prompt "Errors in apply? Is the apply process running? Remember to re-execute pending errors before restarting the apply"
--First check sequence
select apply_name,queue_name,status,error_number,error_message
from dba_apply@orclstream
where status<>'ENABLED' OR error_number is not null ;
-- if there are errors in the above query, you should check below query for details
--select * from dba_apply_error@orclstream ;
--execute DBMS_APPLY_ADM.EXECUTE_All_ERRORS;
--select * from dba_apply_error
--execute DBMS_APPLY_ADM.START_APPLY(apply_name => 'APPLY_MTS');
--select * from DBA_APPLY_PROGRESS

prompt "errors in capture process or disabled capture processes"
select CAPTURE_NAME,QUEUE_NAME,STATUS,CAPTURED_SCN,APPLIED_SCN,ERROR_NUMBER,ERROR_MESSAGE
FROM DBA_CAPTURE
where status<>'ENABLED' OR CAPTURED_SCN-APPLIED_SCN<>0 OR ERROR_NUMBER IS NOT NULL;

---- setting trace on for the capture process:
 -----execute dbms_capture_adm.set_parameter('&CAPTURE_NAME','trace_level','2');
prompt "Displaying Information About Propagations that Send Buffered Messages"
SELECT p.PROPAGATION_NAME,
s.QUEUE_SCHEMA,
s.QUEUE_NAME,
s.DBLINK,
s.SCHEDULE_STATUS,
P.STATUS,
P.ERROR_MESSAGE,
P.ERROR_DATE
FROM DBA_PROPAGATION p, V$PROPAGATION_SENDER s
WHERE p.DESTINATION_DBLINK = s.DBLINK AND
p.SOURCE_QUEUE_OWNER = s.QUEUE_SCHEMA AND
p.SOURCE_QUEUE_NAME  = s.QUEUE_NAME
and (p.status<>'ENABLED' OR
s.schedule_status<>'SCHEDULE ENABLED' OR ERROR_MESSAGE IS NOT NULL);
--
prompt "Displaying the Schedule for a Propagation Job"
SELECT DISTINCT
p.PROPAGATION_NAME,
TO_CHAR(s.START_DATE, 'HH24:MI:SS MM/DD/YY') START_DATE,
s.PROPAGATION_WINDOW,
s.NEXT_TIME,
s.LATENCY,
DECODE(s.SCHEDULE_DISABLED,
'Y', 'Disabled',
'N', 'Enabled') SCHEDULE_DISABLED,
s.PROCESS_NAME,
s.FAILURES
FROM DBA_QUEUE_SCHEDULES s, DBA_PROPAGATION p
WHERE
p.DESTINATION_DBLINK = s.DESTINATION
AND s.SCHEMA = p.SOURCE_QUEUE_OWNER
AND s.QNAME = p.SOURCE_QUEUE_NAME
and (s.SCHEDULE_DISABLED='Y'
or S.FAILURES>0);

prompt "Apply process errors:"
SELECT s.SUBSCRIBER_NAME,
q.QUEUE_SCHEMA,
q.QUEUE_NAME,
s.LAST_DEQUEUED_SEQ,
s.NUM_MSGS,
s.TOTAL_SPILLED_MSG ,
a.error_number,
a.error_message ,
a.status
FROM V$BUFFERED_QUEUES@orclstream q, V$BUFFERED_SUBSCRIBERS@orclstream s, DBA_APPLY@orclstream a
WHERE q.QUEUE_ID = s.QUEUE_ID AND
s.SUBSCRIBER_ADDRESS IS NULL AND
s.SUBSCRIBER_NAME = a.APPLY_NAME
and (s.total_spilled_msg>0 or A.status<>'ENABLED' OR A.ERROR_NUMBER IS NOT NULL);

prompt "Propagations Dequeuing Messages from Each Buffered Queue"
SELECT p.PROPAGATION_NAME,
s.SUBSCRIBER_ADDRESS,
s.CURRENT_ENQ_SEQ,
s.LAST_BROWSED_SEQ,
s.LAST_DEQUEUED_SEQ,
s.NUM_MSGS,
s.TOTAL_SPILLED_MSG
FROM DBA_PROPAGATION p, V$BUFFERED_SUBSCRIBERS s, V$BUFFERED_QUEUES q
WHERE q.QUEUE_ID = s.QUEUE_ID AND
p.SOURCE_QUEUE_OWNER = q.QUEUE_SCHEMA AND
p.SOURCE_QUEUE_NAME = q.QUEUE_NAME AND
p.DESTINATION_DBLINK = s.SUBSCRIBER_ADDRESS
and  s.TOTAL_SPILLED_MSG>0;

prompt "The next 2 queries should return the same number of enqueue_message and dequeued_message (respectively)"
select
capture_name, state,
capture_message_number,
enqueue_message_number,
total_prefilter_discarded,
total_prefilter_kept,
total_messages_captured,
total_messages_enqueued
from gv$streams_capture;

--stream apply status - corelated with the above query
select apply_name,state,
total_messages_dequeued,
total_messages_spilled,
dequeued_message_number
from gv$streams_apply_reader@orclstream;

------- *************** end streams monitor script ***********************

SELECT * FROM DBA_LOGMNR_PURGED_LOG;
SELECT * FROM DBA_CAPTURE_PARAMETERS;
SELECT * FROM DBA_CAPTURE_EXTRA_ATTRIBUTES
SELECT * FROM DBA_QUEUE_TABLES order by owner, queue_table
SELECT * FROM DBA_QUEUES order by owner
SELECT * FROM DBA_APPLY_DML_HANDLERS
select * from DBA_QUEUE_SCHEDULES
select * from DBA_STREAMS_COLUMNS;
select * from DBA_STREAMS_ADMINISTRATOR;
select * from DBA_STREAMS_RULES;
select * from DBA_STREAMS_TABLE_RULES;
select * from SYS.DBA_STREAMS_UNSUPPORTED;

-- join between all info on capture source database
SELECT * FROM DBA_STREAMS_RULES dsr
inner join dba_propagation dp on dp.rule_set_name=dsr.rule_set_name


SELECT distinct
destination_queue_name,replace(object_name,'PRS_DEPENDECY','PRS_DEPENDENCY')
 object_name,replace(streams_name,'PROPAGATION','APPLY') STREAM_NAME
FROM DBA_STREAMS_RULES dsr
inner join dba_propagation dp on dp.rule_set_name=dsr.rule_set_name
INNER JOIN DBA_CAPTURE_PREPARED_TABLES DCPT ON dcpt.table_name=replace(object_name,'PRS_DEPENDECY','PRS_DEPENDENCY')

begin
for rc in (
select propagation_name from dba_propagation)
loop
DBMS_PROPAGATION_ADM.START_PROPAGATION(rc.propagation_name);
end loop;
end;

/************Deleting invalid oracle streams****************/
DBMS_STREAMS_ADM.REMOVE_RULE(
   rule_name         IN  VARCHAR2,
   streams_type      IN  VARCHAR2,
   streams_name      IN  VARCHAR2,
   drop_unused_rule  IN  BOOLEAN  DEFAULT TRUE,
   inclusion_rule    IN  BOOLEAN  DEFAULT TRUE);





begin
dbms_streams_adm.remove_rule(rulename);
end;

/********************************************************/

Other useful scripts:

View Message Notification:
COLUMN STREAMS_NAME HEADING 'Messaging|Client' FORMAT A10
COLUMN QUEUE_OWNER HEADING 'Queue|Owner' FORMAT A5
COLUMN QUEUE_NAME HEADING 'Queue Name' FORMAT A20
COLUMN NOTIFICATION_TYPE HEADING 'Notification|Type' FORMAT A15
COLUMN NOTIFICATION_ACTION HEADING 'Notification|Action' FORMAT A25

SELECT STREAMS_NAME,
       QUEUE_OWNER,
       QUEUE_NAME,
       NOTIFICATION_TYPE,
       NOTIFICATION_ACTION
  FROM DBA_STREAMS_MESSAGE_CONSUMERS
  WHERE NOTIFICATION_TYPE IS NOT NULL;

Determine consumer of each message in persistent queue:

COLUMN MSG_ID HEADING 'Message ID' FORMAT 9999
COLUMN MSG_STATE HEADING 'Message State' FORMAT A13
COLUMN CONSUMER_NAME HEADING 'Consumer' FORMAT A30
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;


View Contents of message in persisten queue:

SELECT qt.user_data.AccessNumber() "Numbers in Queue"
  FROM strmadmin.oe_q_table_any qt;
SELECT qt.user_data.AccessVarchar2() "Varchar2s in Queue"
   FROM strmadmin.oe_q_table_any qt;

Monitoring Buffered queues:
Buffered queues enable Oracle databases to optimize messages by storing them in the SGA instead of always storing them in a queue table.
Captured LCRs always are stored in buffered queues, but other types of messages can be stored in buffered queues or persistently in queue tables.
Messages in a buffered queue can spill from memory if they have been staged in the buffered queue for a period of time without being dequeued, or
if there is not enough space in memory to hold all of the messages. Messages that spill from memory are stored in the appropriate queue table.

Determine number of messages in buffered queue:
COLUMN QUEUE_SCHEMA HEADING 'Queue Owner' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Queue Name' FORMAT A15
COLUMN MEM_MSG HEADING 'Messages|in Memory' FORMAT 99999999
COLUMN SPILL_MSGS HEADING 'Messages|Spilled' FORMAT 99999999
COLUMN NUM_MSGS HEADING 'Total Messages|in Buffered Queue' FORMAT 99999999

SELECT QUEUE_SCHEMA,
       QUEUE_NAME,
       (NUM_MSGS - SPILL_MSGS) MEM_MSG,
       SPILL_MSGS,
       NUM_MSGS
  FROM V$BUFFERED_QUEUES;

View captured process for LCRS in each buffered queue:
COLUMN SENDER_NAME HEADING 'Capture|Process' FORMAT A10
COLUMN SENDER_ADDRESS HEADING 'Sender Queue' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Queue Name' FORMAT A10
COLUMN CNUM_MSGS HEADING 'Number|of LCRs|Enqueued' FORMAT 99999999
COLUMN LAST_ENQUEUED_MSG HEADING 'Last|Enqueued|LCR' FORMAT 9999999999
COLUMN MEMORY_USAGE HEADING 'Percent|Streams|Pool|Used' FORMAT 999
COLUMN PUBLISHER_STATE HEADING 'Publisher|State' FORMAT A10

SELECT SENDER_NAME,
       SENDER_ADDRESS,
       QUEUE_NAME,      
       CNUM_MSGS,
       LAST_ENQUEUED_MSG,
       MEMORY_USAGE,
       PUBLISHER_STATE
  FROM V$BUFFERED_PUBLISHERS;

Display information about propagations which send buffered messaged:
COLUMN PROPAGATION_NAME HEADING 'Propagation' FORMAT A15
COLUMN QUEUE_SCHEMA HEADING 'Queue|Owner' FORMAT A10
COLUMN QUEUE_NAME HEADING 'Queue|Name' FORMAT A15
COLUMN DBLINK HEADING 'Database|Link' FORMAT A10
COLUMN SCHEDULE_STATUS HEADING 'Schedule Status' FORMAT A20

SELECT p.PROPAGATION_NAME,
       s.QUEUE_SCHEMA,
       s.QUEUE_NAME,
       s.DBLINK,
       s.SCHEDULE_STATUS
  FROM DBA_PROPAGATION p, V$PROPAGATION_SENDER s
  WHERE p.SOURCE_QUEUE_OWNER      = s.QUEUE_SCHEMA AND
        p.SOURCE_QUEUE_NAME       = s.QUEUE_NAME AND
        p.DESTINATION_QUEUE_OWNER = s.DST_QUEUE_SCHEMA AND
        p.DESTINATION_QUEUE_NAME  = s.DST_QUEUE_NAME;

COLUMN PROPAGATION_NAME HEADING 'Propagation' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Queue|Name' FORMAT A15
COLUMN DBLINK HEADING 'Database|Link' FORMAT A20
COLUMN TOTAL_MSGS HEADING 'Total|Messages' FORMAT 99999999
COLUMN TOTAL_BYTES HEADING 'Total|Bytes' FORMAT 999999999999

SELECT p.PROPAGATION_NAME,
       s.QUEUE_NAME,
       s.DBLINK,
       s.TOTAL_MSGS,
       s.TOTAL_BYTES
  FROM DBA_PROPAGATION p, V$PROPAGATION_SENDER s
  WHERE p.SOURCE_QUEUE_OWNER      = s.QUEUE_SCHEMA AND
        p.SOURCE_QUEUE_NAME       = s.QUEUE_NAME AND
        p.DESTINATION_QUEUE_OWNER = s.DST_QUEUE_SCHEMA AND
        p.DESTINATION_QUEUE_NAME  = s.DST_QUEUE_NAME;

Displaying Performance Statistics for Propagations that Send Buffered Messages:

COLUMN PROPAGATION_NAME HEADING 'Propagation' FORMAT A15
COLUMN QUEUE_NAME HEADING 'Queue|Name' FORMAT A13
COLUMN DBLINK HEADING 'Database|Link' FORMAT A9
COLUMN ELAPSED_DEQUEUE_TIME HEADING 'Dequeue|Time' FORMAT 99999999.99
COLUMN ELAPSED_PICKLE_TIME HEADING 'Pickle|Time' FORMAT 99999999.99
COLUMN ELAPSED_PROPAGATION_TIME HEADING 'Propagation|Time' FORMAT 99999999.99

SELECT p.PROPAGATION_NAME,
       s.QUEUE_NAME,
       s.DBLINK,
       (s.ELAPSED_DEQUEUE_TIME / 100) ELAPSED_DEQUEUE_TIME,
       (s.ELAPSED_PICKLE_TIME / 100) ELAPSED_PICKLE_TIME,
       (s.ELAPSED_PROPAGATION_TIME / 100) ELAPSED_PROPAGATION_TIME
  FROM DBA_PROPAGATION p, V$PROPAGATION_SENDER s
  WHERE p.SOURCE_QUEUE_OWNER      = s.QUEUE_SCHEMA AND
        p.SOURCE_QUEUE_NAME       = s.QUEUE_NAME AND
        p.DESTINATION_QUEUE_OWNER = s.DST_QUEUE_SCHEMA AND
        p.DESTINATION_QUEUE_NAME  = s.DST_QUEUE_NAME;

Viewing the Propagations Dequeuing Messages from Each Buffered Queue:

COLUMN PROPAGATION_NAME HEADING 'Propagation' FORMAT A11
COLUMN QUEUE_SCHEMA HEADING 'Queue|Owner' FORMAT A5
COLUMN QUEUE_NAME HEADING 'Queue|Name' FORMAT A5
COLUMN SUBSCRIBER_ADDRESS HEADING 'Subscriber|Address' FORMAT A15
COLUMN STARTUP_TIME HEADING 'Startup|Time' FORMAT A9
COLUMN CNUM_MSGS HEADING 'Cumulative|Messages' FORMAT 99999999
COLUMN TOTAL_DEQUEUED_MSG HEADING 'Total|Messages' FORMAT 99999999
COLUMN LAST_DEQUEUED_NUM HEADING 'Last|Dequeued|Message|Number' FORMAT 99999999

SELECT p.PROPAGATION_NAME,
       s.QUEUE_SCHEMA,
       s.QUEUE_NAME,
       s.SUBSCRIBER_ADDRESS,
       s.STARTUP_TIME,
       s.CNUM_MSGS,        
       s.TOTAL_DEQUEUED_MSG,
       s.LAST_DEQUEUED_NUM
FROM DBA_PROPAGATION p, V$BUFFERED_SUBSCRIBERS s
WHERE p.SOURCE_QUEUE_OWNER = s.QUEUE_SCHEMA AND
      p.SOURCE_QUEUE_NAME  = s.QUEUE_NAME AND
      p.PROPAGATION_NAME   = s.SUBSCRIBER_NAME AND
      s.SUBSCRIBER_ADDRESS LIKE '%' || p.DESTINATION_DBLINK;

Displaying Performance Statistics for Propagations That Receive Buffered Messages:

COLUMN SRC_QUEUE_NAME HEADING 'Source|Queue|Name' FORMAT A20
COLUMN SRC_DBNAME HEADING 'Source|Database' FORMAT A20
COLUMN ELAPSED_UNPICKLE_TIME HEADING 'Unpickle|Time' FORMAT 99999999.99
COLUMN ELAPSED_RULE_TIME HEADING 'Rule|Evaluation|Time' FORMAT 99999999.99
COLUMN ELAPSED_ENQUEUE_TIME HEADING 'Enqueue|Time' FORMAT 99999999.99

SELECT SRC_QUEUE_NAME,
       SRC_DBNAME,
       (ELAPSED_UNPICKLE_TIME / 100) ELAPSED_UNPICKLE_TIME,
       (ELAPSED_RULE_TIME / 100) ELAPSED_RULE_TIME,
       (ELAPSED_ENQUEUE_TIME / 100) ELAPSED_ENQUEUE_TIME
  FROM V$PROPAGATION_RECEIVER;

Viewing the Apply Processes Dequeuing Messages from Each Buffered Queue:
COLUMN SUBSCRIBER_NAME HEADING 'Apply Process' FORMAT A16
COLUMN QUEUE_SCHEMA HEADING 'Queue|Owner' FORMAT A5
COLUMN QUEUE_NAME HEADING 'Queue|Name' FORMAT A5
COLUMN STARTUP_TIME HEADING 'Startup|Time' FORMAT A9
COLUMN CNUM_MSGS HEADING 'Cumulative|Messages' FORMAT 99999999
COLUMN TOTAL_DEQUEUED_MSG HEADING 'Number of|Dequeued|Messages'
  FORMAT 99999999
COLUMN LAST_DEQUEUED_NUM HEADING 'Last|Dequeued|Message|Number' FORMAT 99999999

SELECT s.SUBSCRIBER_NAME,
       q.QUEUE_SCHEMA,
       q.QUEUE_NAME,
       s.STARTUP_TIME,
       s.CNUM_MSGS,        
       s.TOTAL_DEQUEUED_MSG,
       s.LAST_DEQUEUED_NUM
FROM V$BUFFERED_QUEUES q, V$BUFFERED_SUBSCRIBERS s, DBA_APPLY a
WHERE q.QUEUE_ID = s.QUEUE_ID AND
      s.SUBSCRIBER_ADDRESS IS NULL AND
      s.SUBSCRIBER_NAME = a.APPLY_NAME;