Oracle Streams were introduced in oracle 9.2. Oracle Streams are a generic mechanism to replicate data between one database to another database. The processing of streams is divided into three main processes(Capture, Staging and Apply).
This article provides step by step instruction to build the three way streams. This article is tested in oracle 9.2.0.8. We have three database DB1, DB2, DB3. The steam is setup on dept table for scott schema on all three instance. Any changes(DML/DDL) made on any of the database will be replicate to other two database.
Instance SetupThe below init parmeters needs to be set on intance level for all three database(DB1,DB2,DB3).ALTER SYSTEM SET JOB_QUEUE_PROCESSES = 20;
ALTER SYSTEM SET AQ_TM_PROCESSES = 2;
ALTER SYSTEM SET GLOBAL_NAMES = TRUE;
ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE = SPFILE;
ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE = SPFILE;
SHUTDOWN IMMEDIATE;
STARTUP;
Stream admin Setup on DB1CONNECT SYS/password@db1 AS SYSDBA;
CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB1/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/
BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/
create user strmadmin identified by strmadmin
/
alter user strmadmin default tablespace LOGMNRTS
/
grant connect,resource,aq_administrator_role,dba to strmadmin
/
GRANT SELECT ANY DICTIONARY TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
Stream admin Setup on DB2CONNECT SYS/PASSWORD@DB2 AS SYSDBA;
CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB2/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/
BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/
create user strmadmin identified by strmadmin
/
alter user strmadmin default tablespace LOGMNRTS
/
grant connect,resource,aq_administrator_role,dba to strmadmin
/
GRANT SELECT ANY DICTIONARY TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
Stream admin Setup on DB3CONNECT SYS/password@db3 AS SYSDBA;
CREATE TABLESPACE LOGMNRTS DATAFILE 'C:/ORACLE/ORADATA/DB3/logmnrts.dbf' SIZE 25M
AUTOEXTEND ON MAXSIZE UNLIMITED
/
BEGIN
DBMS_LOGMNR_D.SET_TABLESPACE('LOGMNRTS');
END;
/
create user strmadmin identified by strmadmin
default tablespace LOGMNRTS
/
grant connect,resource,aq_administrator_role,dba to strmadmin
/
GRANT SELECT ANY DICTIONARY TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQ TO STRMADMIN;
GRANT EXECUTE ON DBMS_AQADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_FLASHBACK TO STRMADMIN;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_APPLY_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_RULE_ADM TO STRMADMIN;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO STRMADMIN;
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/
Define and start the Apply Process in DB1connect strmadmin/strmadmin@db1;
CREATE DATABASE LINK DB2
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db2)
)
)'
/
CREATE DATABASE LINK DB3
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db3)
)
)'
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB2',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB3',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB2',
apply_user => 'SCOTT');
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB3',
apply_user => 'SCOTT');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB2',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB3',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
END;
/
declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB2';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB2' );
end if;
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB3';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB3' );
end if;
end;
/
Define and start the Apply Process in DB2CONNECT strmadmin/strmadmin@DB2;
CREATE DATABASE LINK DB1
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db1)
)
)'
/
CREATE DATABASE LINK DB3
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db3)
)
)'
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB1',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB3',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB1',
apply_user => 'SCOTT');
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB3',
apply_user => 'SCOTT');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB1',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB3',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
END;
/
declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB1';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB1' );
end if;
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB3';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB3' );
end if;
end;
/
Define and start the Apply Process in DB3CONNECT strmadmin/strmadmin@DB3;
CREATE DATABASE LINK DB1
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db1)
)
)'
/
CREATE DATABASE LINK DB2
CONNECT TO STRMADMIN IDENTIFIED BY STRMADMIN USING
' (DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = NYC-LAPTOP05)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = db2)
)
)'
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_IN_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
begin
dbms_streams_adm.set_up_queue(
queue_table => 'DB_STREAMS_QUEUE_TABLE',
storage_clause => 'TABLESPACE TOOLS',
queue_name => 'DB_STREAMS_OUT_QUEUE',
queue_user => 'STRMADMIN' );
end;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB1',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'APPLY',
streams_name => 'DB_APPLY_DB2',
queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/
BEGIN
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB1',
apply_user => 'SCOTT');
dbms_apply_adm.alter_apply(
apply_name => 'DB_APPLY_DB2',
apply_user => 'SCOTT');
END;
/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB1',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'DB_APPLY_DB2',
parameter => 'DISABLE_ON_ERROR',
value => 'N' );
END;
/
declare
v_started number;
begin
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB1';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB1' );
end if;
select decode(status,'ENABLED',1,0) into v_started
from dba_apply
where apply_name = 'DB_APPLY_DB2';
if ( v_started = 0 ) then
dbms_apply_adm.start_apply( apply_name => 'DB_APPLY_DB2' );
end if;
end;
/
Setup Propagation Rule for DB1
CONNECT SYS/PASSWORD@DB1 AS SYSDBA
ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;
CONNECT STRMADMIN/STRMADMIN@DB1
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB2',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB2',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB3',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB3',
include_dml => true,
include_ddl => true,
source_database => 'DB1');
END;
/
Setup Propagation Rule for DB2
CONNECT SYS/PASSWORD@DB2 AS SYSDBA
ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;
CONNECT STRMADMIN/STRMADMIN@DB2
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB1',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB1',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB3',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB3',
include_dml => true,
include_ddl => true,
source_database => 'DB2');
END;
/
Setup Propagation Rule for DB3
CONNECT SYS/PASSWORD@DB3 AS SYSDBA
ALTER TABLE SCOTT.DEPT DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;
ALTER TABLE SCOTT.DEPT ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (DEPTNO,DNAME,LOC) ALWAYS;
CONNECT STRMADMIN/STRMADMIN@DB3
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'SCOTT.DEPT',
streams_type => 'CAPTURE',
streams_name => 'DB_CAPTURE',
queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB1',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB1',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'SCOTT.DEPT',
streams_name => 'DB_PROPAGATE_DB2',
source_queue_name => 'STRMADMIN.DB_STREAMS_OUT_QUEUE',
destination_queue_name => 'STRMADMIN.DB_STREAMS_IN_QUEUE@DB2',
include_dml => true,
include_ddl => true,
source_database => 'DB3');
END;
/
Setup Instantiation SCN for DB1
CONNECT STRMADMIN/STRMADMIN@DB1
set echo on
set feedback on
set serverout on
declare
iscn number;
begin
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/
CONNECT STRMADMIN/STRMADMIN@DB2
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB1',
instantiation_scn => iscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB3
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB1',
instantiation_scn => iscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB1
EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Setup Instantiation SCN for DB2
CONNECT STRMADMIN/STRMADMIN@DB2
set echo on
set feedback on
set serverout on
declare
iscn number;
begin
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/
CONNECT STRMADMIN/STRMADMIN@DB1
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB2',
instantiation_scn => vscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB3
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB2',
instantiation_scn => vscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB2
EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Setup Instantiation SCN for DB3
CONNECT STRMADMIN/STRMADMIN@DB3
set echo on
set feedback on
set serverout on
declare
iscn number;
begin
iscn :=
DBMS_OUTPUT.PUT_LINE('Instantiation SCN is: 'iscn );
end;
/
CONNECT STRMADMIN/STRMADMIN@DB1
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB3',
instantiation_scn => vscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB2
BEGIN
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'SCOTT.DEPT',
source_database_name => 'DB3',
instantiation_scn => vscn);
END;
/
CONNECT STRMADMIN/STRMADMIN@DB3
EXEC DBMS_CAPTURE_ADM.START_CAPTURE( CAPTURE_NAME => 'DB_CAPTURE')
/
Stream Testing
Scenario 1
Here is the records in dept table for DB1 instance
scott@DB1.US.ORACLE.COM> select * from dept;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
scott@DB1.US.ORACLE.COM>
Now insert one record in DB1 instance and check this record is replicated in DB2, DB3 instance.
scott@DB1.US.ORACLE.COM> insert into
2 dept values(50,'IT','HOUSTON');
1 row created.
scott@DB1.US.ORACLE.COM> COMMIT;
Commit complete.
scott@DB1.US.ORACLE.COM>
scott@DB2.US.ORACLE.COM> SELECT * FROM DEPT;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 IT HOUSTON
scott@DB2.US.ORACLE.COM>
scott@DB3.US.ORACLE.COM> SELECT * FROM DEPT;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 IT HOUSTON
scott@DB3.US.ORACLE.COM>
Now the changes in DB1 is replicated to DB2, DB3.
Scenario 2
Let us update the record in DB2 and see the changes are replicating to DB1, DB3.
scott@DB2.US.ORACLE.COM> UPDATE dept
2 set dname='Testing'
3 where deptno=50;
1 row updated.
scott@DB2.US.ORACLE.COM> commit;
Commit complete.
scott@DB2.US.ORACLE.COM>
scott@DB1.US.ORACLE.COM> SELECT * FROM DEPT;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 Testing HOUSTON
scott@DB1.US.ORACLE.COM>
scott@DB3.US.ORACLE.COM> SELECT * FROM DEPT;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
50 Testing HOUSTON
scott@DB3.US.ORACLE.COM>
Now the changes in DB2 is replicated to DB1, DB3.
Scenario 3
Now delete one record in DB3 instance and check this record is replicated in DB1, DB2 instance.
scott@DB3.US.ORACLE.COM> delete dept
2 where deptno=50;
1 row deleted.
scott@DB3.US.ORACLE.COM> commit;
Commit complete.
scott@DB3.US.ORACLE.COM>
scott@DB1.US.ORACLE.COM> select * from dept;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
scott@DB1.US.ORACLE.COM>
scott@DB2.US.ORACLE.COM> select * from dept;
DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
scott@DB2.US.ORACLE.COM>