Forms: Pipe
A pipe is a connector that links two elements in a PhixFlow model . A pipe joining a datasource to a data collector has no editable details.
Form: Pipe Details
The following fields are configured on the Details tab:
...
- Push: data is 'pushed' rather than 'pulled' into the output stream. Every time data is written to the input stream, the pipe 'informs' the output stream and the Analysis Engine will attempt to run stream generation on the output stream. Push pipes are shown as blue lines on modelling screens.
- Pull: This type of pipe will pass data along as it is calculated which may be before the streamset is complete or any of the records have been committed to the database. This may improve performance over other pipes which wait until the input streams have committed their data to the database before reading the data back in again (which allows the database to apply the filtering and aggregation rules). Because PUSH pipes don't get their data from the database, but instead receive it direct from the input stream, any filters or aggregation specified on the pipe will be ignored. Pull pipes are shown as blue lines on modelling screens.
- Look-up: used to reference data, see lookup for more details. Look-up pipes are shown as dotted lines on modelling screens.
...
This field is used to determine which Streamsets to read from the input Stream. There are 4 options available:
...
and sends data from the input to the output. Pipes allows you to control which attributes and which records from the input are delivered by to the output, although in most cases - with minimal configuration - you will get all columns and the records from the current run.
The pipe must be enabled to make it active.
For advanced configuration, see Advanced Pipe Configuration.
Panel | ||||
---|---|---|---|---|
| ||||
A pipe joining a datasource to a data collector has no details to edit. All the configuration for the output data set occurs in the collector - either a database collector for a database datasource, or an HTTP collector for an HTTP datasource. |
Name
The name is used to refer to the pipe in other model elements.
Panel | ||||
---|---|---|---|---|
| ||||
The name can have no special characters except the underscore character '_', it has to start with a letter and cannot be an Attribute Function name. |
Type
There are 3 options available:
- Pull: pull pipes are the most common type in PhixFlow - they "pull" data from the input to the output. Pull pipes are shown as solid arrows on models.
- Look-up: look-up pipes are used to enrich data. Typically, you will have one of more pull pipes to supply the base data for an output, and if needed one or more look-up pipes to enrich the base data with values from additional inputs. Look-up pipes are shown as dashed lines on models.
- Push: data is "pushed" rather than "pulled" into the output stream. Push pipes are most commonly used when sending data from streams to exporters (File Exporters, Database Exporters, HTTP Exporters). Push pipes are shown as dotted lines on models.
Data To Read
Specify what input data to use. There are 6 options available:
- Latest: supply data from the current run (the latest stream set). This is the mostly commonly used option.
- Previous: supply data from the previous run (the previous stream set). This is used when you are comparing data for the current run with data from the previous run, for example, today's data with yesterday's.
- All: supply data from all runs (all stream sets). Selecting All displays the Read Future Data field, which may be used with Transactional streams.
- All Previous: supply data from all runs except the current run (all stream sets except the latest stream set).
- Same Run: this option should only be used where the input and output streams are Transactional. The pipe will only collect data from inputs in the same analysis run. This configuration support several analysis runs going on at the same time without interfering with each other.
- Custom: Advanced fields are revealed by selecting this option (see Advanced Pipe Configuration), but you are recommended not to update these unless directed to by PhixFlow consultants or support.
Panel | ||||||
---|---|---|---|---|---|---|
| ||||||
In some circumstances the input Stream may have |
...
Stream Sets that have dates in the future relative to the |
...
Stream Set being generated for the output Stream |
...
. This may happen, for example, if you have rolled back a number of |
...
Stream Sets on the output Stream but have not rolled back the corresponding |
...
Stream Sets on the input Stream, and have then requested that |
...
the output Stream is brought up to date. Some of the |
...
Stream Sets on the input Stream will |
...
have dates in the future relative to some of the |
...
Stream Sets you are rebuilding. By default |
...
, pipes will ignore any |
...
Stream Sets with dates in the future relative to the |
...
Stream Set you are generating. This is so that if you are rebuilding an old |
...
Stream Set the pipe will retrieve the same data on the rerun as it retrieved when the |
...
Stream Set was first built. Similarly, if you are running a Transactional Stream |
...
, it is possible that while your analysis run is taking place, other analysis runs which started after yours may have |
...
completed before yours |
...
. These will have generated additional Stream Sets on the input Stream with a future data relative to the date of the |
...
Stream Set you are generating. |
...
For Transactional input Streams it is possible to tell the pipe not to ignore these future |
...
Stream Sets by ticking the Read Future |
...
- Read Future Data
- Only collect from the same run
- Max Stream Sets
- Historied
- From Data Offset
- To Date Offset
...
Data tick box, which is available when Data To Read is All or Custom. |
Static
Normally when a pipe requests data from a non-static input stream, that stream will first attempt to bring itself up to date, generating new
...
stream sets as necessary, before supplying the data
...
requested. However, if this field is ticked, the input
...
stream will not
...
If this flag is not ticked then it is an indication to PhixFlow that the Stream is not ready to be used during any analysis runs and should be therefore be ignored.
The following fields are configured on the Advanced tab:
...
run. Pipes from collectors cannot be marked as static.
Mandatory
If ticked, when multiple Streams are being merged then there must be an input record from this Pipe for an output record to be generated by the output Stream.
...
Panel | ||||||
---|---|---|---|---|---|---|
| ||||||
If this is a push pipe with positive offsets and this flag is ticked then the notification to create another stream set will only be pushed along the pipe if the last stream set created contains at least one record. |
Multiplier
...
The Execution Strategy determines how this pipe should be implemented.
Where this pipe is a push or pull pipe into a Merge Stream, the Default Execution Strategy is to select all stream items from the input Stream sorted by the Group By attributes, then to read items from all input pipes simultaneously, constructing candidate sets from items with matching key values.
Where the Directed Execution Strategy is applied to a pipe (the pipe must not be Mandatory), the other pipes with the Default Strategy operate as above, each being sorted then merged to generate a sequence of candidate sets; the Directed pipe then runs worker tasks to select the additional items by matching key value. These selects are batched up so that each worker reads items for many key values in a single select (see Worker Size), and many workers are run in parallel (see Max Workers).
In general, the Directed strategy should only be used where
- the number of items in the source Stream is so large that the sorting phase of the Default strategy takes too long, or
- only a small subset of the items are needed (the majority being discarded because they have key values that don't match the key values read on one of the other mandatory pipes).
Changing the Execution Strategy will make the Merge faster or slower depending on the input data and the details of the input and output Streams, but will not change the business logic of the Merge (i.e. which input items are grouped into candidate sets).
If the input to the pipe is a Collector, the list of key values is made available as _keyList. This will typically be used with an in clause in the Collector query. For example:
select * from customer where account_num in ({_keyList}) order by account_num
...
The maximum number of concurrent worker tasks.
If blank, this defaults to 1.
...
The number of key values to read for a single worker task (which runs a single select statement).
If blank, this defaults to 1000. This is the maximum value that can be used when reading from an Oracle database.
...
When doing a lookup, there are two common scenarios:
- The pipe does a single lookup onto a stream or database table to get a large number of records in one go (e.g. 1000 records)
- The pipe does many lookups, getting a small number of records for each lookup (e.g. 10 records at a time).
Try to estimate the largest number of records that the lookup pipe reads on a single read from a stream or database collector.
The cache is used when carrying out lookups from streams or database collectors. During a lookup, PhixFlow will retrieve records in sets that match the filter on the pipe, e.g.
Code Block |
---|
WHERE AddresssLine1 = _out.Address |
For efficiency, the records are cached (stored temporarily in memory) so that if the same set of records need to be looked up again they are readily available without going back to the database.
There is a limit to how many records can be stored in the cache. The Cache Size field allows you to specify this limit. If no value is set, it will default to the system-wide default, specified in the maximum pipe cache size in the System Tuning tab of the System Configuration dialog.
**A warning will be logged to the console if 90% or more of the cache size is brought back in a single read.**
**your administrator may tick the enforce flag, which will cause an error**
The Pipe "stream_name.lookup_pipe_name" cache is 100% full (the cache size is 10).
**Technical breakout here**
...
The offset applied to the start of the collection period, relative to the period in the output stream that requires populating.
The units are the period of the output stream, that is, if the output stream has a daily period, then setting from date offset = -1.0 means that the start of the collection period will be 1 day earlier than the start of the period in the output stream that is being calculated.
If this is a push pipe then a positive offset can be input. This will tell the stream to run again and generate another stream set.
...
The offset applied to the end of the collection period, relative to the period in the output stream that requires populating.
The units are the period of the output stream, that is, if the output stream has a daily period, then setting to date offset = -1.0 means that the end of the collection period will be 1 day earlier than the end of the period in the output stream that is being calculated.
If this is a push pipe then a positive offset can be input. This will tell the stream to run again and generate another stream set.
...
- Only collect from the same run
- Max Stream Sets (this may also be set to zero)
- Historied
...
During Look Ups
...
During File Export
...
During Drill Down
...
This causes the pipe to present each candidate set to the output stream in a different way than usual. The multiplier flag is on the Advanced tab of the form.
For each output record generated by a stream, the stream will get a set of records from each of its input pipes. If the multiplier flag is ticked on one of these, then the stream will generate an output record for each record from the set of records provided by the multiplier pipe. For each output record, each of the other input pipes will provide the same set of records as normal.
Filters, sorting and grouping, aggregating
Filters, sorting and grouping, and aggregating are configured through their own sections on the form:
Field | Description |
---|---|
Filter | Allows the user to set up a filter on the pipe. Also allows to set the flag to Include Audit Records. If not set, superseded records will be filtered out. |
Sort/Group | Specify the group/ order by attributes on the pipe. |
Aggregate attributes | Specify any aggregate attributes on the pipe. |
Advanced | Configure advanced features on the pipe. |
Anchor | ||||
---|---|---|---|---|
|
The data being delivered by a pipe can be filtered.
Filters are made up of a set of clauses; each clause in turn contains a number of conditions. These conditions must be satisfied for data to be passed through the pipe.
Form Icons
The form provides the following buttons:
Add a clause or condition. | |
Delete a clause or condition. | |
Specifies that the value entered is a literal value. Click this icon to change this - to specify that the value entered be evaluated as an expression. | |
Specifies that the value entered is a evaluated as an PhixFlow expression. Click this icon to change this - to specify that the value entered be treated as a literal. Note : ["123", "234", "345"] looks like a literal value but it can be evaluated as an expression. | |
Open the expression in a larger editor. |
Filter on Current User
Sometimes when running analysis you want to select, from the source, only records belonging to the currently logged in user. To set a filter where, say, an attribute in the source Owner
equals the current logged in user, add a condition to the filter like this:
Owner
Equals _user.name
fx
Enter a list of values for an "Is In" or "Is Not In" filter
If you want to based on a list of values, use the Is in or Is not in comparators, then type the list of values into the comparison field as a comma separated list like this:
Country
Is in England, France, Germany
ABC
In this case you must NOT click the ABC icon to convert the value to an fx, because this will indicate that the value is a formula; it must be left as a literal value. If you do click the ABC icon, then the value must be entered like this:
Country
Is in ["England","France","Germany"]
fx
Cache Extraction Filter
A cache extraction filter allows you to further filter the data retrieved by a pipe. These are not commonly used, but are sometimes helpful when either:
- Optimising performance on a lookup pipe when for a set of records, the record you require from the lookup depends on non-key data, e.g. the date
- When getting data from a pull pipe when the filter requires that you compare one value in each record with another; this is not possible within a standard filter.
For case 1, when using a lookup pipe, data retrieved is stored in a cache. See cache size for details. The cache extraction filter allows you, as you are processing a set of output records, to use different cached entries from the lookup for each of the records are you are processing. This is very fast compared to looking up from the source (i.e. going back to an external DB table or even another PhixFlow stream) for each output record.
E.g. you want to look up the credit rating for a customer for a set of transactions - in the output, each transaction is represented by a single output record. You create an indexed lookup pipe using CustNo as the key for the index. This means that for each new CustNo you encounter in the data, all the credit rating entries for that CustNo would be retrieved by the pipe and placed into the cache. The credit rating for each customer is fully historied, so you get a number of entries for each CustNo. To get the relevant lookup entry for each output report (each transaction), you need to compare the transaction date of the output record to the dates of credit rating entries in the cache. So to extract the relevant record, you include a cache extraction filter in the form:
Code Block |
---|
StartDate >= _out.TransDate && (EndDate <= _out.TransDate || EndDate == _NULL) |
Cache extraction filters are entered free hand.
The attribute names referenced must exist in a stream. This means that the each attribute must be one of:
- an attribute in a source stream, if you are reading from a stream
- if you are reading from an external database table, one of the fields returned by the database collector AND an attribute in the output stream - i.e. to use an attribute with the source as a database collector, there must be an attribute of matching name in the output stream
- an attribute in the destination stream, in which case you will refer to it using the format
_out.AttributeName
Anchor | ||||
---|---|---|---|---|
|
A Pipe can be grouped and sorted by attributes of the input stream. These are set up in the Group/Order section of the Pipe form. In fact, this section is called Sort/Group for pull and push pipes, and Order/Index for lookup pipes.
The following fields are configured at the level of the pipe:
Field | Description |
---|---|
Maximum Number of Records Per Group | If a value is entered into this field, then when PhixFlow collates the input records into groups according to the specified Grouping Attribute Details (see below) once the maximum number of records have been added to the group any additional records for this group will be discarded. Records will be added to the group according to the specified sort order. This can be useful, for example, where you know that for a given set of grouping attributes you may get multiple records but you are only interested in the most recent record for that set. In this case you can configure the pipe to group by the grouping attributes, sort by an appropriate date attribute in descending order and set the Maximum Number of Records to 1. This will ensure that you only get the most recent record for the specified group when you read from this pipe. |
Index Type | This setting is only available for lookup pipes. Look-up pipes can be configured for fast "indexed" access to cached data collected from external tables, files or from other streams. Indexed access is controlled through configuring a pipe with an index and setting index expressions on grouping attributes. If the Type field on the Pipe is set to 'Look-up' then the field "Index Type" becomes available. This can have the value "None" meaning that there are no index keys or "Exact Match", "Best Match" or "Near Match" as described below:
|
Form: Grouping Attribute Details
The following fields are configured for each grouping attribute:
Field | Description |
---|---|
Attribute | Name of an attribute in the input stream. |
Order | The position of this attribute in the list of sorting/grouping attributes. |
Direction | The direction of the sort based on this attribute: Ascending; or Descending. |
Group | If this attribute is part of the candidate key set, the Group flag must be ticked. Otherwise, the attributes will be used only to sort the data in the candidate set. |
Index Expression | This field is only available for lookup pipes where you have selected indexing. If the pipe is configured as a Look-up with an index match type set, this field becomes available. Look-up pipes can be configured for fast "indexed" access to cached data. This data is collected from external tables, files or from other streams. Indexed access is controlled through configuring a pipe with an index and setting index expressions on "Group By" attributes here. |
Anchor | ||||
---|---|---|---|---|
|
Aggregate Attributes define the aggregated properties that are available when data is read from an aggregating Pipe. Note that Aggregate Attributes are not available on Pipes from Database Collectors (any aggregation can be performed in the query SQL), nor are they available on Pipes from File Collectors.
Possible aggregate values are counts, summations, averages and maximum or minimum values of Stream Items grouped in the Group/Order tab of the Pipe.
Field | Description |
---|---|
Stream Function | The Aggregate Function e.g. Count or Sum. |
Attribute | The name of the Stream Attribute to be aggregated. Note that the value in this field is not used if the Aggregate Function is Count. |
Name | A new name for the aggregated attribute. Note that this can be the same as the original Attribute. |
Order | The order of the aggregate attribute. |
Only aggregate attributes which can be aggregated. For example, do not try to sum an attribute which contains text.
Anchor | ||||
---|---|---|---|---|
|
Anchor | ||||
---|---|---|---|---|
|
The following fields are available in the Basic Settings section if you set Data To Read = Custom:
Field | Description |
---|---|
From Offset | The offset applied to the start of the collection period, relative to the period in the output stream that requires populating. |
To Offset | The offset applied to the end of the collection period, relative to the period in the output stream that requires populating. |
Max Stream Sets | In almost all cases this specifies the number of stream sets to be retrieved from the input stream. However, if this is a push pipe with positive offsets this value indicates the maximum number of stream sets that can be created i.e. the maximum number of cycles this pipe can initiate. |
Only collect from same run | Every time the analysis engine runs, all of the stream sets that are created by all of the streams affected by that analysis run are given the same Run ID. If this flag is ticked then the pipe will only collect stream sets from the input stream that have the same Run ID as the stream set currently being created by the output stream. You should only use this flag is both the input and output streams are transactional. |
Historied | If ticked, the pipe will collect data from the input stream by period. So if the from and to offsets are both 0.0, and the output stream requires stream generation for the period 17/10/07 - 18/10/07, data will be collected from the input stream for the period 17/10/07 - 18/10/07. If not ticked, all data will be collected from the input stream, regardless of period. In this case, the offsets are still used to determine whether the required data periods in the input stream exist before the stream calculation can be carried out. |
Read Future Data | If you are running a transactional stream then it is possible that while your analysis run is taking place, other analysis runs which started after yours may have managed to complete before yours, generating additional stream sets on the input stream. These additional stream sets will then have a future data relative to the date of the stream set you are generating. By default PhixFlow will ignore input stream sets that have a date in the future relative to the stream set being generated.
or
|
Advanced section
The following fields are configured in the Advanced section:
Form Icons
The form provides the standard form icons.
The form also provides the following icons on the Filter tab:
Adds a clause to the filter. | |
Deletes the selected clause or condition from the filter. | |
Adds a condition to a clause of the filter. |
The form also provides the following icons on both the Sort/Group and Aggregate Attributes tabs:
Shows the list of attributes that can be added as sort/group or aggregate attributes. | |
Deletes the selected object from the list. | |
Adds an object to the list. |
See Also
...
Field | Description | |||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Data Expected | This field is available when the Pipe Type is Push or Pull. This flag allows the user to specify that the pipe is expecting to receive data. If ticked but no data is received this is treated as an error. | |||||||||||||||||||||||||||||||
Allow Incomplete Stream Sets | Normally, when a pipe tries to read from an input stream that contains an incomplete stream set, PhixFlow will attempt to complete the stream set before passing data down the pipe. However if the stream is static (i.e. the stream has its 'static' flag ticked) or is effectively static (i.e. all of the pipes reading from it in this analysis run are static) then, instead of completing the stream set, an error message is produced indicating that you cannot read from this stream because it contains an incomplete stream set. If you do not want this error message to be produced when reading from static (or effectively static) streams, but would instead prefer PhixFlow to ignore the incomplete stream sets, then you must tick this box on all pipes that will read from the input stream in this analysis run. If there are multiple pipes that read from the input stream during this analysis run and even one of the pipes does not have this box ticked then you will not be allowed to read from the stream and the error message will be produced. Pipes which are not used in the current analysis run (for example where they lead to streams on branches of the model which are not run by the current task plan) have no effect on whether or not the error message is produced. | |||||||||||||||||||||||||||||||
| The cache is used when carrying out lookups from streams or database collectors. When doing a lookup, there are two common scenarios:
In case 2, the results returned are typically based on a key value, e.g. an account number. This will be used in the filter of the pipe, if you are reading from a stream, or in the query, if you are reading from a database collector. For example, the query in a database collector will include the condition:
For efficiency, the records are cached (stored temporarily in memory) so that if the same set of records need to be looked up again they are readily available without going back to the database. This field allows you to set a limit on the size of the cache. Setting a limit is important because if you do not, the cache can become very large and consume a lot of memory, which can lead to a slow down in both your tasks and those of other users of PhixFlow. To set the cache size, try to estimate the largest number of records that the lookup pipe will return on a single read. If you do not set a limit, it will default to the system-wide default, specified in the Maximum Pipe Cache Size in the System Tuning tab of the System Configuration.
| |||||||||||||||||||||||||||||||
Buffer Size | The buffer size used to perform the stream calculation. If a large amount of data is being processed, then setting a large buffer size will give better performance. | |||||||||||||||||||||||||||||||
Pipe View | The pipe view is used to limit which fields are retrieved down the pipe and in what order, and in some circumstances how each field is to be formatted. You can select from any of the views that have been configured on the source stream. Please note that any sorting or filtering of records will have to be applied directly on the pipe, and will not be inherited from the pipe view. The pipe view is used in two contexts. During lookupsPipe views can also be used on lookup pipes to limit the fields that are returned by the lookup request. This is most useful in the scenario where the you want to read and cache data on a lookup pipe from a stream that has lots of attributes but where only a small number of attributes are actually required. You can simply create a new view on the source stream listing only the attributes needed, then specify it as the pipe view on the lookup pipe. Only those attributes specified on the view will then be loaded. During File ExportWhen sending data to a file exporter only those fields specified on the pipe view will be exported. If no pipe view is supplied then all fields will be exported. | |||||||||||||||||||||||||||||||
Max Records To Read | The maximum number of records that should be read down this pipe. The pipe may read more than this number of records if it is configured to carry out multiple reads simultaneously e.g. if it is connected to a File Collector which has been configured to read multiple files simultaneously or if this pipe strategy is "Directed" with multiple workers |
The following fields are configured through separate tabs on the form:
...
. | |
Strategy | The Execution Strategy determines how this pipe should be implemented. See the section on Directed Merge Strategy |
Max Workers | This field is only available if Strategy = Directed The maximum number of concurrent worker tasks. If blank, this defaults to 1. |
Worker Size | This field is only available if Strategy = Directed The number of key values to read for a single worker task (which runs a single select statement). If blank, this defaults to 1000. This is the maximum value that can be used when reading from an Oracle database. |