Forms: Pipe
A pipe is a connector that links two elements in a PhixFlow model. A pipe joining a datasource to a data collector has no editable details.
Form: Pipe Details
The following fields are configured on the Details tab:
...
- Push: data is 'pushed' rather than 'pulled' into the output stream. Every time data is written to the input stream, the pipe 'informs' the output stream and the Analysis Engine will attempt to run stream generation on the output stream. Push pipes are shown as blue lines on modelling screens.
- Pull: This type of pipe will pass data along as it is calculated which may be before the streamset is complete or any of the records have been committed to the database. This may improve performance over other pipes which wait until the input streams have committed their data to the database before reading the data back in again (which allows the database to apply the filtering and aggregation rules). Because PUSH pipes don't get their data from the database, but instead receive it direct from the input stream, any filters or aggregation specified on the pipe will be ignored. Pull pipes are shown as blue lines on modelling screens.
- Look-up: used to reference data, see lookup for more details. Look-up pipes are shown as dotted lines on modelling screens.
...
This field is used to determine which Streamsets to read from the input Stream. There are 4 options available:
- Latest: This will cause the pipe to read only the latest Streamset from the input Stream.
- Previous: This will cause the pipe to read the Streamset before the latest Streamset on the input Stream.
- All: This will cause the pipe to read all Streamsets from the input Stream. However, In some circumstances the input Stream may have Streamsets that have dates in the future relative to the Streamset the output Stream is creating. This may happen for example if you have rolled back a number of Streamsets on the output Stream but have not rolled back the corresponding Streamsets on the input Stream and have then requested that one of the rolled back Streamsets be rebuilt. Some of the Streamsets on the input Stream will then have dates in the future relative to the Streamset you are rebuilding.
By default the pipe will ignore any Streamsets with dates in the future relative to the Streamset you are generating so that if you are rebuilding an old Streamset the pipe will retrieve the same data on the rerun as it retrieved when the Streamset was first built.
Similarly if you are running a Transactional Stream then it is possible that while your analysis run is taking place, other analysis runs which started after yours may have managed to complete before yours thereby generating additional Streamsets on the input Stream with a future data relative to the date of the Streamset you are generating.
For Transactional input Streams it is possible to tell the pipe not to ignore these future Streamsets by ticking the Read Future Data tickbox on the Advanced tab. - Custom: If this option is selected then you need to specify which input Streamsets to read by configuring combinations of the following fields on the Advanced tab:
- Read Future Data
- Only collect from the same run
- Max Stream Sets
- Historied
- From Data Offset
- To Date Offset
...
Normally when a Pipe requests data from a non-static input Stream then that Stream will first attempt to bring itself up to date, generating new Streamsets as necessary, before supplying the data requests. However, if this field is ticked, the input Stream will not attempt to do this.
...
If this flag is not ticked then it is an indication to PhixFlow that the Stream is not ready to be used during any analysis runs and should be therefore be ignored.
The following fields are configured on the Advanced tab:
...
Mandatory
...
If ticked, when multiple Streams are being merged then there must be an input record from this Pipe for an output record to be generated by the output Stream.
If this is a push pipe with positive offsets and this flag is ticked then the notification to create another stream set will only be pushed along the pipe if the last stream set created contains at least one record.
...
The Execution Strategy determines how this pipe should be implemented.
Where this pipe is a push or pull pipe into a Merge Stream, the Default Execution Strategy is to select all stream items from the input Stream sorted by the Group By attributes, then to read items from all input pipes simultaneously, constructing candidate sets from items with matching key values.
Where the Directed Execution Strategy is applied to a pipe (the pipe must not be Mandatory), the other pipes with the Default Strategy operate as above, each being sorted then merged to generate a sequence of candidate sets; the Directed pipe then runs worker tasks to select the additional items by matching key value. These selects are batched up so that each worker reads items for many key values in a single select (see Worker Size), and many workers are run in parallel (see Max Workers).
In general, the Directed strategy should only be used where
- the number of items in the source Stream is so large that the sorting phase of the Default strategy takes too long, or
- only a small subset of the items are needed (the majority being discarded because they have key values that don't match the key values read on one of the other mandatory pipes).
Changing the Execution Strategy will make the Merge faster or slower depending on the input data and the details of the input and output Streams, but will not change the business logic of the Merge (i.e. which input items are grouped into candidate sets).
If the input to the pipe is a Collector, the list of key values is made available as _keyList. This will typically be used with an in clause in the Collector query. For example:
select * from customer where account_num in ({_keyList}) order by account_num
...
The maximum number of concurrent worker tasks.
If blank, this defaults to 1.
...
The number of key values to read for a single worker task (which runs a single select statement).
If blank, this defaults to 1000. This is the maximum value that can be used when reading from an Oracle database.
...
The cache size is used when carrying out lookups from streams or database collectors. During a lookup, PhixFlow will retrieve records in sets that are determined by the filter and index values set on the pipe. For efficiency, the records are cached (stored in memory temporarily) so that if the same set of records need to be looked up again they are readily available without going back to the database.
There is a limit to how many records can be stored in the cache. The user is able to specify this cache size limit on the pipe configuration under the advanced tab. If no value is set there, it will default to the system-wide default, specified in the maximum pipe cache size setting field in the System Tuning tab of the System Configuration dialog.
If a lookup would put a number of records into the cache such that the limit would be exceeded, PhixFlow will remove records from the bottom of the cache until there is enough space for new records. If the number of new records being looked up exceeds the maximum cache size, PhixFlow will still add this set to the cache, unless the enforce cache size flag is ticked in the system tuning dialog. In that case, PhixFlow will terminate the job and log an error on the console, similar to:
The Pipe "stream_name.lookup_pipe_name" cache is 100% full (the cache size is 10).
Additionaly, a warning is generated when the number of cached records approaches the maximum, whether or not the limit is set to be enforced.
...
The offset applied to the start of the collection period, relative to the period in the output stream that requires populating.
The units are the period of the output stream, that is, if the output stream has a daily period, then setting from date offset = -1.0 means that the start of the collection period will be 1 day earlier than the start of the period in the output stream that is being calculated.
If this is a push pipe then a positive offset can be input. This will tell the stream to run again and generate another stream set.
...
The offset applied to the end of the collection period, relative to the period in the output stream that requires populating.
The units are the period of the output stream, that is, if the output stream has a daily period, then setting to date offset = -1.0 means that the end of the collection period will be 1 day earlier than the end of the period in the output stream that is being calculated.
If this is a push pipe then a positive offset can be input. This will tell the stream to run again and generate another stream set.
...
- Only collect from the same run
- Max Stream Sets (this may also be set to zero)
- Historied
...
During Drill Down
...
During Look Ups
...
During File Export
...
The following fields are configured through separate tabs on the form:
...
Form Icons
The form provides the standard form icons.
The form also provides the following icons on the Filter tab:
Adds a clause to the filter. | |
Deletes the selected clause or condition from the filter. | |
Adds a condition to a clause of the filter. |
The form also provides the following icons on both the Sort/Group and Aggregate Attributes tabs:
Shows the list of attributes that can be added as sort/group or aggregate attributes. | |
Deletes the selected object from the list. | |
Adds an object to the list. |
See Also
...
Overview
A pipe is a connector that links two elements in a PhixFlow model and sends data from the input to the output. Pipes allows you to control which attributes and which records from the input are delivered by to the output, although in most cases - with minimal configuration - you will get all columns and the records from the current run.
The pipe must be enabled to make it active.
For advanced configuration, see Advanced Pipe Configuration.
Insert excerpt | ||||||
---|---|---|---|---|---|---|
|
Panel | ||||
---|---|---|---|---|
| ||||
A pipe joining a datasource to a data collector has no details to edit. All the configuration for the output data set occurs in the collector - either a database collector for a database datasource, or an HTTP collector for an HTTP datasource. |
Basic Settings
Field | Description | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Name | Enter a name. The name is used to refer to the pipe in other model elements. Pipe names default to The name:
| ||||||||||||||||
Enabled |
| ||||||||||||||||
Static | Normally when a pipe requests data from a non-static input stream, that stream will first attempt to bring itself up to date, generating new stream sets as necessary, before supplying the data requested. However, if this field is ticked, the input stream will not run. Pipes from collectors cannot be marked as static.
Pipes from collectors cannot be marked as static. | ||||||||||||||||
Mandatory |
If this is a push pipe with positive offsets and this option is ticked then the notification to create another stream set will only be pushed along the pipe if the last stream set created contains at least one record. This causes the pipe to present each candidate set to the output stream in a different way than usual. | ||||||||||||||||
Multiplier |
| ||||||||||||||||
Type | Select:
| ||||||||||||||||
Data to Read | Select the type of input data to use.
| ||||||||||||||||
Read Future Data | Use this option to exclude or include input streams sets that have future dates relative to the stream set you are generating. For details about how future stream sets occur, see Managing Future Stream Sets, above.
| ||||||||||||||||
Input | The name of the object at the start of the pipe. | ||||||||||||||||
Output | The name of the stream at the end of the pipe. |
Anchor | ||||
---|---|---|---|---|
|
In some circumstances the input stream may have stream sets that have dates in the future relative to the stream set being generated for the output stream. This may happen, for example, if:
- you roll-back some stream sets on the output stream
- but do not roll-back the corresponding stream sets on the input stream
- and then request that the output stream is brought up to date.
Some of the stream sets on the input stream will have dates in the future relative to some of the stream sets you are rebuilding.
By default, the Read Future Data check box is not ticked. This means pipes ignore any stream sets with dates in the future relative to the stream set you are generating. You want to ignore future stream sets when you rebuild an old stream set, because you want the pipe to retrieve the same data on the rerun as it retrieved when the stream set was first built.
When you run analysis on a stream with a transactional period, it is possible that as your analysis is still running, a different run can start and complete. This run can generate additional stream sets on the input stream with a future data relative to the date of the stream set you are generating. For transactional input streams, you want the pipe to use these future streams. To do this, tick the Read Future Data check box.
Filter
Filters are made up of a set of clauses; each clause in turn contains a number of conditions. These conditions must be satisfied for data to be passed through the pipe.
Tip |
---|
When the pipe Type is Lookup, the filter controls which data will be cached in memory. |
Field | Description | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Cache Size
| Available when Type is Lookup. For lookup pipes, PhixFlow uses the pipe cache when it looks-up data from streams or database collectors. For efficiency, the records are cached (stored temporarily in memory) so that if the same set of records need to be looked up again they are readily available without going back to the database. Enter a number to set a limit on the data cache size available for the pipe. You need to estimate the largest number of records that the lookup pipe will return on a single read. Check whether PhixFlow is looking up:
If you do not set a limit for the cache, PhixFlow uses the system default set in System Configuration → System Tuning → Maximum Pipe Cache Size.
| ||||||||||||||||||||||||||||||||
Pipe View | Use this option to look up data from attributes that are present in a stream view on the input stream. Select a stream view from the list. If the input stream has no views, the list will be empty.
Use the pipe view to limit the attributes that the pipe reads when a stream has lots of attributes containing many data records but you only need data from a few attributes. Only the data for the attributes in the stream view are sent to the output stream. Pipe views are very useful:
To set up a pipe view:
| ||||||||||||||||||||||||||||||||
Include History Records |
| ||||||||||||||||||||||||||||||||
Condition | Select one of the options
To add more conditions, hover your mouse pointer over this field to display the button and add another condition to your filter. | ||||||||||||||||||||||||||||||||
Clause | Select an option from the list. PhixFlow adds more fields where you can:
| ||||||||||||||||||||||||||||||||
Filter Icons | Hover your mouse pointer over conditions or clauses to display:
| ||||||||||||||||||||||||||||||||
Cache Extraction Filter | A cache extraction filter allows you to further filter the data retrieved by a pipe. These are not commonly used, but are sometimes helpful when either:
For case 1, when using a lookup pipe, data retrieved is stored in a cache. See cache size for details. The cache extraction filter allows you, as you are processing a set of output records, to use different cached entries from the lookup for each of the records are you are processing. This is very fast compared to looking up from the source (i.e. going back to an external DB table or even another PhixFlow stream) for each output record. E.g. you want to look up the credit rating for a customer for a set of transactions - in the output, each transaction is represented by a single output record. You create an indexed lookup pipe using CustNo as the key for the index. This means that for each new CustNo you encounter in the data, all the credit rating entries for that CustNo would be retrieved by the pipe and placed into the cache. The credit rating for each customer is fully historied, so you get a number of entries for each CustNo. To get the relevant lookup entry for each output report (each transaction), you need to compare the transaction date of the output record to the dates of credit rating entries in the cache. So to extract the relevant record, you include a cache extraction filter in the form:
Cache extraction filters are entered free hand. The attribute names referenced must exist in a stream. This means that the each attribute must be one of:
|
Filter Examples
Filter on Current User
Sometimes when running analysis you want to select, from the source, only records belonging to the currently logged in user. To set a filter where, say, an attribute in the source Owner
equals the current logged in user, add a condition to the filter like this:
Owner
Equals _user.name
fx
Enter a list of values for an "Is In" or "Is Not In" filter
If you want to based on a list of values, use the Is in or Is not in comparators, then type the list of values into the comparison field as a comma separated list like this:
Country
Is in England, France, Germany
ABC
In this case you must NOT click the ABC icon to convert the value to an fx, because this will indicate that the value is a formula; it must be left as a literal value. If you do click the ABC icon, then the value must be entered like this:
Country
Is in ["England","France","Germany"]
fx
Sort/Group
For lookup pipes this section is called Order/Index.
Use this section to group and sort data as it comes through the pipe. This section has:
- a toolbar with standard buttons
- a grid that lists the attributes that you want to sort or use to group
- below the grid are the following options:
Field | Description |
---|---|
Maximum Number of records per Group | Enter an upper limit for grouped records. When collating the input records into groups, PhixFlow uses the specified sort order. When it has added the maximum number of records, any more records for the group are ignored. This can be useful if you want the most recent record for an attribute that has many records.
|
Index Type | Available when Type is Look-up. Look-up pipes can be configured for fast "indexed" access to cached data collected from external tables, files or from other streams. Indexed access is controlled through configuring a pipe with an index and setting index expressions on grouping attributes. If the Type field on the Pipe is set to 'Look-up' then the field "Index Type" becomes available. This can have the value "None" meaning that there are no index keys or "Exact Match", "Best Match" or "Near Match" as described below:
|
Using the Sort/Group Grid
To add a stream attribute to the list:
- click
to open the list of attributes in the input streamInsert excerpt _attributes_show _attributes_show nopanel true - drag a stream attribute into the grid.
To remove an attribute, click
Insert excerpt | ||||||
---|---|---|---|---|---|---|
|
To set the sort or group properties for an attribute, double-click its name in the grid. If you want to create a new attribute that is not present in the input stream, in the section toolbar, click
Insert excerpt | ||||||
---|---|---|---|---|---|---|
|
Field | Description | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Attribute | For input stream attributes, PhixFlow displays the attribute name. (Read-only) For a new attribute, enter a name. | ||||||||||||||||
Order | Enter the number for the order the attribute appears in the grid and the order in which it is processed. Other attributes are renumbered. | ||||||||||||||||
Direction | Select the sort order
| ||||||||||||||||
Group |
If this attribute is part of the candidate key set, you must tick the Group check box. Otherwise, the attributes will be used only to sort the data in the candidate set. | ||||||||||||||||
Index Expression | This field is available for lookup pipes with an Index Type option selected. Look-up pipes can be configured for fast "indexed" access to cached data. This data is collected from external tables, files or from other streams. Indexed access is controlled through configuring a pipe with an index and setting index expressions on "Group By" attributes here. | ||||||||||||||||
Audit Summary | See Common Properties. |
Tip |
---|
In some cases, you may have a pipe connected to a database collector, which pulls data from an external database table. In these cases, the fields in the database must have matching attribute names in the output stream. You can refer to it using the format |
Aggregate Attributes
Use this section to define the properties of data that you want to combine as it comes through the pipe.
Note |
---|
You cannot aggregate data from attributes if the pipe's input is from: If you need to aggregate data from a database collector, you can use an SQL query. |
This section has:
- a toolbar with standard buttons
- a grid that lists the attributes that you want to aggregate.
Using the Aggregate Attributes Grid
To add a stream attribute to the list:
- click
to open the list of attributes in the input streamInsert excerpt _attributes_show _attributes_show nopanel true - drag a stream attribute into the grid.
To remove an attribute, click
Insert excerpt | ||||||
---|---|---|---|---|---|---|
|
To set the properties for an aggregate attribute, double-click its name in the grid. PhixFlow opens the attribute's sort properties:
Field | Description |
---|---|
Stream Function | Select an function to combine stream data records.
See Aggregate Function for details. Make sure the function matches the data in the attribute. For example, you cannot Sum text. |
Attribute | Select the stream attribute aggregated from the list of attributes in the input stream. PhixFlow does not use the value in this field if the Aggregate Function is Count. |
Name | Enter the name for the aggregated attribute. This can be the same as the original attribute. |
Order | The order of the aggregate attribute in the output stream. |
Advanced
The following options are available when the pipe Type is Push or Pull. Allow Incomplete Stream Sets is also available when the pipe Type is Lookup.
Field | Description | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Data Expected |
| ||||||||||||||||||||||||||
Allow Incomplete Stream Sets |
PhixFlow cannot complete a stream set if:
Pipes that are not used in the analysis run do not try to complete a stream set, so will not report an error. (Unused pipes can occur if they lead to streams on branches of the model that are not being run.) | ||||||||||||||||||||||||||
Buffer Size | Enter a number for the buffer size used to perform the stream calculation. If a large amount of data is being processed, then setting a large buffer size will give better performance. | ||||||||||||||||||||||||||
Max Records To Read | Enter a number for the maximum number of records that should be read down this pipe. The pipe may read more than this number of records if it is configured to carry out multiple reads simultaneously. For example:
| ||||||||||||||||||||||||||
Strategy | Select an option to specify how this pipe should be implemented. See the section on Directed Merge Strategy
| ||||||||||||||||||||||||||
Max Workers | This field is available when Strategy is Directed Enter the maximum number of concurrent worker tasks. When no value is specified, this defaults to 1. | ||||||||||||||||||||||||||
Worker Size | This field is available when Strategy is Directed Enter the number of key values to read for a single worker task, which runs a single select statement. When no value is specified, this defaults to 1000. This is the maximum value that can be used when reading from an Oracle database. | ||||||||||||||||||||||||||
Log Traffic |
|
Custom Data to Read
Anchor | ||||
---|---|---|---|---|
|
The following properties are available in Basic Settings when you set Data To Read to Custom.
Field | Description | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Only collect from same run | Every time the analysis engine runs, all of the stream sets that are created by all of the streams affected by that analysis run are given the same Run ID.
| ||||||||||||||||
From Offset | Enter the offset applied to the start of the collection period, relative to the period in the output stream that requires populating. | ||||||||||||||||
To Offset | Enter the offset applied to the end of the collection period, relative to the period in the output stream that requires populating. | ||||||||||||||||
Max Stream Sets | Enter the number of stream sets to be retrieved from the input stream. For a push pipe with positive offsets. enter the maximum number of stream sets that can be created i.e. the maximum number of cycles this pipe can initiate. | ||||||||||||||||
Historied |
For example, if:
the pipe reads data from the input stream for the period 17/10/07 - 18/10/07. |