Scheduler Event Based Jobs.

I trawled through the documentation and articles on the web for one which would show an event based job, on the sys users scheduler queue table, rather than my own. Since it took so long to get it all working, I’m documenting it here.
Our Agent.
Since the queue is a secure queue, we have to create an agent to use to connect to the queue.
begin
DBMS_AQADM.CREATE_AQ_AGENT(agent_name=>’PWCDW2′);
end;
/
GRANT EXECUTE ON SYS.DBMS_STREAMS_MESSAGING TO “PWCDW2″;
GRANT EXECUTE ON SYS.DBMS_AQ TO “PWCDW2″ ;
GRANT MANAGE ANY QUEUE TO “PWCDW2″ ;
DECLARE
subscriber sys.aq$_agent;
BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(agent_name => ‘”PWCDW2″‘);
subscriber := sys.aq$_agent(‘”PWCDW2″‘, ‘”SYS”.”SCHEDULER$_EVENT_QUEUE”‘, 0);
dbms_aqadm.add_subscriber(queue_name => ‘”SYS”.”SCHEDULER$_EVENT_QUEUE”‘, subscriber=> subscriber, rule=> ” , transformation=> ”);
DBMS_AQADM.ENABLE_DB_ACCESS(agent_name => ‘”PWCDW2″‘, db_username => ‘”PWCDW2″‘);
END;
/

Objects.
I’m going to use what I consider a practical example, starting with a table we’ll queue 6 materialized views to be refreshed.
create table pwcdw2.t1 (id number);
Now to create materialized views based on the t1 table and the previous materialized view.
CREATE MATERIALIZED VIEW “PWCDW2″.”MV1″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select * from t1;
CREATE MATERIALIZED VIEW “PWCDW2″.”MV2″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select mv1.id mid, t1.id from mv1, t1 where mv1.id = t1.id;
CREATE MATERIALIZED VIEW “PWCDW2″.”MV3″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select mv2.id mid, t1.id from mv2, t1 where mv2.id = t1.id;
CREATE MATERIALIZED VIEW “PWCDW2″.”MV4″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select mv3.id mid, t1.id from mv3, t1 where mv3.id = t1.id;
CREATE MATERIALIZED VIEW “PWCDW2″.”MV5″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select mv4.id mid, t1.id from mv4, t1 where mv4.id = t1.id;
CREATE MATERIALIZED VIEW “PWCDW2″.”MV6″
ORGANIZATION HEAP PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING
STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT)
TABLESPACE “PWCDW2″
BUILD DEFERRED
USING INDEX
REFRESH FORCE ON DEMAND
USING DEFAULT LOCAL ROLLBACK SEGMENT
DISABLE QUERY REWRITE
AS select mv5.id mid, t1.id from mv5, t1 where mv5.id = t1.id;

Programs.
Rather than create jobs that will do the execution, preferring modular design, I create programs to perform the refresh, each of which will be called in the jobs.
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV1″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV1”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV1′,
enabled=>TRUE);
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV2″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV2”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV2′,
enabled=>TRUE);
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV3″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV3”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV3′,
enabled=>TRUE);
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV4″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV4”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV4′,
enabled=>TRUE);
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV5″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV5”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV5′,
enabled=>TRUE);
END;
/
BEGIN
DBMS_SCHEDULER.CREATE_PROGRAM(
program_name=>’”PWCDW2″.”PWC_MV6″‘,
program_action=>’dbms_mview.refresh(”PWCDW2.MV6”);’,
program_type=>’PLSQL_BLOCK’,
number_of_arguments=>0,
comments=>’Refresh Materialized View MV6′,
enabled=>TRUE);
END;
/

Jobs.
And the jobs to run the programs for us. Notice that each job created will raise it’s events to the sys scheduler table, and each materialized view waits for the previous materialized views completed message.
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV1″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV1″‘,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Refresh JOB_MV1 with program’,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV1″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV1″‘ );
END;
/
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV2″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV2″‘,
event_condition => ‘tab.user_data.object_name = ”JOB_MV1” and tab.user_data.event_type=”JOB_SUCCEEDED”’,
queue_spec => ‘SYS.SCHEDULER$_EVENT_QUEUE, PWCDW2′,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Run MV2 refresh after MV1′,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV1″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV2″‘ );
END;
/
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV3″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV3″‘,
event_condition => ‘tab.user_data.object_name = ”JOB_MV2” and tab.user_data.event_type=”JOB_SUCCEEDED”’,
queue_spec => ‘SYS.SCHEDULER$_EVENT_QUEUE, PWCDW2′,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Run MV3 refresh after MV2′,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV2″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV3″‘ );
END;
/
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV4″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV4″‘,
event_condition => ‘tab.user_data.object_name = ”JOB_MV3” and tab.user_data.event_type=”JOB_SUCCEEDED”’,
queue_spec => ‘SYS.SCHEDULER$_EVENT_QUEUE, PWCDW2′,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Run MV4 refresh after MV3′,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV3″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV4″‘ );
END;
/
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV5″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV5″‘,
event_condition => ‘tab.user_data.object_name = ”JOB_MV4” and tab.user_data.event_type=”JOB_SUCCEEDED”’,
queue_spec => ‘SYS.SCHEDULER$_EVENT_QUEUE, PWCDW2′,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Run MV5 refresh after MV4′,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV4″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV5″‘ );
END;
/
BEGIN
sys.dbms_scheduler.create_job(
job_name => ‘”PWCDW2″.”JOB_MV6″‘,
program_name => ‘”PWCDW2″.”PRG_REF_MV6″‘,
event_condition => ‘tab.user_data.object_name = ”JOB_MV5” and tab.user_data.event_type=”JOB_SUCCEEDED”’,
queue_spec => ‘SYS.SCHEDULER$_EVENT_QUEUE, PWCDW2′,
start_date => systimestamp at time zone ‘+2:00′,
job_class => ‘”DEFAULT_JOB_CLASS”‘,
comments => ‘Run MV6 refresh after MV5′,
auto_drop => FALSE,
enabled => FALSE);
sys.dbms_scheduler.set_attribute( name => ‘”PWCDW2″.”JOB_MV5″‘, attribute => ‘raise_events’, value => dbms_scheduler.job_started + dbms_scheduler.job_succeeded + dbms_scheduler.job_failed + dbms_scheduler.job_broken + dbms_scheduler.job_completed + dbms_scheduler.job_stopped + dbms_scheduler.job_disabled);
sys.dbms_scheduler.enable( ‘”PWCDW2″.”JOB_MV6″‘ );
END;
/

Recap.
So, we’ve covered a lot there. Let’s recap on what’s created.
SQL> select mview_name, last_refresh_date from user_mviews where mview_name like ‘MV%’ order by mview_name;
MVIEW_NAME LAST_REFRESH_DATE
—————————— ——————
MV1
MV2
MV3
MV4
MV5
MV6
6 rows selected.
SQL> select program_name, program_type, program_action, enabled from user_scheduler_programs;
PROGRAM_NAME PROGRAM_TYPE
—————————— —————-
PROGRAM_ACTION
————————————————————————————————————————————
ENABL
—–
PRG_REF_MV1 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV1′);
TRUE
PRG_REF_MV2 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV2′);
TRUE
PRG_REF_MV3 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV3′);
TRUE
PRG_REF_MV4 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV4′);
TRUE
PRG_REF_MV5 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV5′);
TRUE
PRG_REF_MV6 PLSQL_BLOCK
dbms_mview.refresh(‘PWCDW2.MV6′);
TRUE
6 rows selected.
SQL> select JOB_NAME, program_name, event_queue_owner, event_queue_name, event_queue_agent from user_scheduler_jobs order by job_name;
JOB_NAME
——————————
PROGRAM_NAME
————————————————————————————————————————————
EVENT_QUEUE_OWNER EVENT_QUEUE_NAME EVENT_QUEUE_AGENT
—————————— —————————— ——————————
JOB_MV1
PRG_REF_MV1
JOB_MV2
PRG_REF_MV2
SYS SCHEDULER$_EVENT_QUEUE PWCDW2
JOB_MV3
PRG_REF_MV3
SYS SCHEDULER$_EVENT_QUEUE PWCDW2
JOB_MV4
PRG_REF_MV4
SYS SCHEDULER$_EVENT_QUEUE PWCDW2
JOB_MV5
PRG_REF_MV5
SYS SCHEDULER$_EVENT_QUEUE PWCDW2
JOB_MV6
PRG_REF_MV6
SYS SCHEDULER$_EVENT_QUEUE PWCDW2
6 rows selected.
Testing.
Let’s put some data in the table first.
SQL> begin
2 for i in 1 .. 100000 loop
3 insert into t1 values (i);
4 end loop;
5 end;
6 /
PL/SQL procedure successfully completed.

Testing.
And we can run the job from basically anywhere, EM or scheduled. Remember that the first to be started is MV1 since all the others are waiting for the events to be raised.
SQL> exec dbms_scheduler.run_job(‘JOB_MV1′);
PL/SQL procedure successfully completed.
SQL> col program_name for a20
SQL> select job_name, program_name, state from user_scheduler_jobs order by job_name;
JOB_NAME PROGRAM_NAME STATE
—————————— ——————– —————
JOB_MV1 PRG_REF_MV1 SUCCEEDED
JOB_MV2 PRG_REF_MV2 SCHEDULED
JOB_MV3 PRG_REF_MV3 SCHEDULED
JOB_MV4 PRG_REF_MV4 SCHEDULED
JOB_MV5 PRG_REF_MV5 SCHEDULED
JOB_MV6 PRG_REF_MV6 SCHEDULED
6 rows selected.
So, just 1 has completed, but the others ? What happened to them ?
SQL> select mview_name, last_refresh_date from user_mviews where mview_name like ‘MV%’ order by mview_name;
MVIEW_NAME LAST_REFRESH_DATE
—————————— ——————
MV1 23-Aug-10 15:27:54
MV2 23-Aug-10 15:27:56
MV3 23-Aug-10 15:27:57
MV4 23-Aug-10 15:27:59
MV5 23-Aug-10 15:28:00
MV6 23-Aug-10 15:28:01
6 rows selected.
So they all completed fine.
SQL> select job_name, status, error# from user_scheduler_job_run_details
2 where log_date > sysdate – (1/24/6);
JOB_NAME STATUS ERROR#
—————————————————————– —————————— ———-
JOB_MV5 SUCCEEDED 0
JOB_MV2 SUCCEEDED 0
JOB_MV3 SUCCEEDED 0
JOB_MV4 SUCCEEDED 0
JOB_MV6 SUCCEEDED 0
JOB_MV1 SUCCEEDED 0
6 rows selected.
Based on this, you can create steps to be taken whenever a previous job completes, fails, stalls, basically any action. However, these are not being defined as chains and rules, but as event driven processes.
This entry was posted in Oracle and tagged , , . Bookmark the permalink.