Provenance Challenge: Karma3, Indiana University

Participating Team

Team and Project Details

Workflow Representation

As described in the project overview, Karma is not tightly coupled to a particular workflow system and provenance collection is implemented through modular instrumentation, which give Karma the flexibility to capture provenance from different sources and diverse workflow architectures. Unlike all our previous applications, where Karma was implemented as a messenger bus listener, provenance data were synchronously ingested through Karma web service in this project. We simulate the workflow execution using the client-service scenario, where a client (which could be a workflow engine, e.g., ODE) schedules and invokes each service. The following figure shows how we set up the Third Provenance Challenge (PC3). There are three components and all of them are built upon the Axis2 web service.

PC3 workflow setup diagram

Open Provenance Model Output

OPM defines three entities which can be represented in Karma as follows:

OPM defines five types of causal dependencies:

Karma has the following causal dependencies that map to OPM dependencies:

In this project, we captured artifact, process, used, wasGeneratedBy, wasTriggeredBy, and wasDerivedFrom.

The OPM output for each job execution are shown below.

Job ID OPM Output
J062941 J062941-opm.xml
J062942 J062942-opm.xml
J062943 J062943-opm.xml
J062944 J062944-opm.xml
J062945 J062945-opm.xml

Query Results

Core Queries

Query 1
For a given detection, which CSV files contributed to it?
Since Karma does not instrument database provenance, we query the detection id from the P2Detection table after the LoadCSVFileIntoTable? execution and sends the data as a data product produced by LoadCSVFileIntoTable?. Data product is stored in an XML format and can be retrieved by XPath query.
select distinct extractvalue(desc_and_annotation, '//ns2:FileEntry/ns1:FilePath')
from data_consumed
where entity_id = (select entity_id
                   from data_produced
                   where extractvalue(desc_and_annotation, '/detectionIDs/detectID=261887481030000003'));

Query 2
The user considers a table to contain values they do not expect. Was the range check (IsMatchTableColumnRanges?) performed for this table?
We answer this query by checking if the given table was consumed by IsMatchTableColumnRanges?. Assume the table is P2Detection.
select count(*)
from (select c.desc_and_annotation
      from data_consumed c, entity e
      where c.entity_id = e.entity_id and
            e.workflow_node_id = 'IsMatchTableColumnRanges') ce
where extractvalue(ce.desc_and_annotation,'/ns2:IsMatchTableColumnRanges/ns2:FileEntry/ns1:TargetTable')='P2Detection'
count for table P2Detection: 2
Yes, IsMatchTableColumnRanges was performed on P2Detection.

Query 3
Which operation executions were strictly necessary for the Image table to contain a particular (non-computed) value?
We answer this query is three steps. First, find the entity for LoadCSVFileIntoTable? and data (P2ImageMeta) consumed by LoadCSVFileIntoTable?.
select distinct c.entity_id
from data_consumed c, entity e 
where extractvalue(c.desc_and_annotation, '//ns1:TargetTable') = 'P2ImageMeta' and 
      workflow_node_id = 'LoadCSVFileIntoTable' and 
      c.entity_id = e.entity_id
Second, recursively perform the following query to find all causal dependencies for the entity LoadCSVFileIntoTable?. Note that _entityID_ is a variable.
select entity_id 
from data_produced p, 
     (select data_product_id 
      from data_consumed 
      where entity_id = _entityID_) c 
where p.data_product_id = c.data_product_id
Finally, we perform a join of the entity table and invocation table to get the iteration information.
select workflow_node_id, extractvalue(desc_and_annotation, '/Iteration')
from entity, invocation
where invokee_id = _entityID_ and entity_id = invokee_id
Process         Iteration
LoadCSVFileIntoTable   2
ReadCSVFileColumnNames   2

Optional Queries

Optional Query 1
The workflow halts due to failing an IsMatchTableColumnRanges? check. How many tables successfully loaded before the workflow halted due to a failed check?
To simulate the workflow halts due to failing an IsMatchIsMatchTableColumnRanges? check, we modify LoadConstants?.java in the package info.ipaw.pc3.psloadworkflow.logic of PC3Service, and perform the following query to see how many times IsMatchIsMatchTableColumnRanges? was invoked.
select count(*)
from invocation,
     (select entity_id
      from entity 
      where workflow_node_id = 'IsMatchTableColumnRanges') e
where invokee_id = e.entity_id
Since IsMatchTableColumnRanges? was invoked three times and one fails, there are two tables successfully loaded.

Optional Query 3
A CSV or header file is deleted during the workflow's execution. How much time expired between a successful IsMatchCSVFileTables? test (when the file existed) and an unsuccessful IsExistsCSVFile?? test (when the file had been deleted)?
To answer this query, we delete the file "P2ImageMeta.csv.hdr" during the workflow execution.
select timediff(t1, t2)
from (select production_time as t1
      from data_produced
      where extractvalue(desc_and_annotation, '/ns2:IsExistsCSVFileResponse/ns2:return')='false') p1,
     (select production_time as t2
      from data_produced
      where extractvalue(desc_and_annotation, '/ns2:IsMatchCSVFileTablesResponse/ns2:return')='true') p2

Optional Query 5
A user executes the workflow many times (say 5 times) over different sets of data (j062941, j062942, ... j062945). He wants to determine, which of the execution halted?
To answer this query, we delete the file "P2ImageMeta.csv" for the job j062943.
select extractvalue(desc_and_annotation, '//JobID') as JobID,
       extractvalue(desc_and_annotation, '//CVSRootPath') as CSVRootPath
from invocation
where invokee_id = (select entity_id
                    from entity
                    where instance = (select e.instance
                                      from data_produced p, entity e
                                      where extractvalue(p.desc_and_annotation, '//ns2:return')='false' and
                                            p.entity_id = e.entity_id) and
                          workflow_id = 'http://localhost:8080/workflow/pc3' and
                          service_id = '')
JobID   CSVRootPath
J062943   C:\PC3\SampleData\J062943

Optional Query 6
Determine the step where halt occured?
Similar to Optional Query 1, we simulate halt due to failing an IsMatchTableColumnRanges? check.
select workflow_node_id, extractvalue(i.desc_and_annotation, '/Iteration') as iteration
from data_produced p, entity e, invocation i
where extractvalue(p.desc_and_annotation, '//ns2:return')='false' and
      p.entity_id = e.entity_id and e.entity_id = i.invokee_id
workflow_node_id      iteration
IsMatchTableColumnRanges   3

Optional Query 8
Which steps were completed successfully before the halt occurred?
Following Optional Query 6,
select workflow_node_id, extractvalue(desc_and_annotation, '/Iteration') as iteration
from entity, invocation
where entity_id = invokee_id and
      invocation_id < (select invocation_id
                         from data_produced p, invocation
                         where extractvalue(p.desc_and_annotation, '//ns2:return')='false' and
                               entity_id = invokee_id)
workflow_node_id             Iteration
IsExistsCSVFile             1                                               
ReadCSVFileColumnNames      1                                               
IsMatchCSVFileColumnNames   1                                               
LoadCSVFileIntoTable        1                                               
UpdateComputedColumns       1                                               
IsMatchTableRowCount        1                                               
IsMatchTableColumnRanges    1                                               
IsExistsCSVFile             2                                               
ReadCSVFileColumnNames      2                                               
IsMatchCSVFileColumnNames   2                                               
LoadCSVFileIntoTable        2                                               
UpdateComputedColumns       2                                               
IsMatchTableRowCount        2                                               
IsMatchTableColumnRanges    2                                               
IsExistsCSVFile             3                                               
ReadCSVFileColumnNames      3                                               
IsMatchCSVFileColumnNames   3                                               
LoadCSVFileIntoTable        3                                               
UpdateComputedColumns       3                                               
IsMatchTableRowCount        3                                               

Optional Query 10
For a workflow execution, determine the user inputs?
We have two solutions for this question. Solution 1: we find the data product in the the data_consumed table but not in the data_produced table.
select distinct data_consumed.data_product_id
frpm data_consumed
left join data_produced
on data_consumed.data_product_id = data_produced.data_product_id
where data_produced.data_product_id IS NULL;
Solution 2: we send the user input along with the "InvokingWorkflow" notification, Thus, we directly perform join of inovation and =invocation_status_ to get the data product which contains user input.
select i.desc_and_annotation
from invocation i, invocation_status
where i.invocation_id = invocation_status.invocation_id and
invocation_status = 'InvokingWorkflow' into @xml;
select extractvalue(@xml, '//JobID');
select extractvalue(@xml, '//CVSRootPath');
JobID: J062942
CVSRootPath: C:\PC3\SampleData\J062942

Optional Query 11
For a workflow execution, determine steps that required user inputs?
Following Optional Query 11, get the methods for the user input "CSVRootPath".
select workflow_node_id 
from entity 
where entity_id in (select entity_id
                    from data_consumed
                    where data_product_id = '')
Next, find the user input for JobID?.
select workflow_node_id 
from entity 
where entity_id in (select entity_id
                    from data_consumed
                    where data_product_id = '')

Optional Query 12
To answer this query, we delete the file P2ImageMeta.csv.hdr.
select distinct extractvalue(c.desc_and_annotation, '//ns2:FileEntry/ns1:TargetTable')
from data_consumed c,
     (select entity_id
      from data_produced
      where extractvalue(desc_and_annotation, '/ns2:IsMatchTableColumnRangesResponse/ns2:return')='true') p
 where c.entity_id = p.entity_id

Optional Query 13
Step dependency: same as the OPM wasTriggeredBy
select p.entity_id as producer, c.entity_id as consumer
from data_consumed c, data_produced p
where c.data_product_id = p.data_product_id
Data dependency: same as the OPM wasDerivedFrom
select c.entity_id, p.data_product_id as output, c.data_product_id as input
from data_consumed c, data_produced p
where c.entity_id = p.entity_id
See OPM output for results.

Suggested Workflow Variants

Suggested Queries

Suggestions for Modification of the Open Provenance Model


-- BinCao - 06 Apr 2009

