PhixFlow Help

Automating data collection

By the end of this chapter you will be able to:

  • Set up automated file collection using a pattern to identify the file to process next
  • Use internal variable _toDate in File Name Pattern in File Collectors
  • Use internal variables _fromDate and _toDate in queries to external databases
  • Read sequenced files

Reading daily files

In this exercise you will create a stream that will be run daily to read files that are generated daily, containing details of customer sign-ups, by region.
Files containing details of daily sign-ups can be found in the input files, in the directory:
…\inputData\CustomerSignUps\input
First you will need to create a File Collector to read the daily files. Instead of doing an ad-hoc load of the file into PhixFlow like you have done so far, you will set up an automated file collection. This will actually read a copy of these files on the PhixFlow server:

  • Create a new model called Process Daily Signup Files
  • Add a File Collector with Name Read CustSignUp Files
  • Double-click on your new File Collector to open the configuration form.
  • In the Details tab:
    • Tick the Enabled flag
    • Leave the Number of Header Lines as 1
    • Leave the flag Allow Non-Scheduled Collector ticked
    • In the field Input Directory Expr. enter the value: "CustomerSignUps/input"

      When setting up automated file collection, the directory is relative to the system-wide default input path set for PhixFlow. To see this:

  • Go to - Admin menu (top right-hand corner of the application)
  • Select System Configuration
  • Go to the System Directories tab
  • Note the entry Import File Location – the input directory you entered in your file collector gives the input directory for the files relative to this starting location|

    In file collectors, when entering Input Directory Expr, File Pattern Expression, Archive Directory Expression or Error Directory Expression - you must surround a fixed text value with quotes. E.g. "file.txt"
    When entering Input Directory Expr, Archive Directory Expression or Error Directory Expression you must make directory separators forward slashes (/), even when you are on a windows platform. E.g. "input/dataFiles"

    • In the field File Pattern Expression enter the value: ".*"
    • Press - Apply at the top of the File Collector form
  • Move to the File Columns tab
    • Press - Create the file attributes automatically from the header row in the file
    • PhixFlow will read the file and work out the attributes that make up the file. You will see these attributes appear in the Attributes list
  • In the banner of the File Collector details form, press - Create a new Stream using the File Collector attributes
  • Have a look the attributes in the new stream – you can see that these have been derived from the columns in the file collector
  • Save all your changes

The file names in the input directory have the format custSignUp-yyyymmdd.txt. Your stream will be scheduled to run daily, and you want to ensure that the correct file is read in on each day. To do this, you will use one of the dates from the run on each day.
Remember that in PhixFlow terms a run of a stream is called a Stream Set. In fact, for each Stream Set there are two dates:

  • The from date/time (_fromDtm) – this the date/time that the stream was last run
  • The to date/time (_toDtm) – this is the date/time that the run goes up to – the current date/time
  • Open the configuration form for Read CustSignUp Files
  • Set the File Pattern Expression to "custSignUp-" + substring(_toDate, 1, 8) + ".txt"

This will insert the value of _toDate – the end date/time of the run (the Stream Set) being generated – into the file name
toDate has the format _yyyymmdd.hhmiss – and we only want the date part of this, so we use substring(_toDate, 1, 8) to get the first 8 characters, that is, just the date part

  • Press OK to save your changes

To test this configuration you need to use the system date override – this simulates the system running at a certain date. The first file is for 23/03/2009 – so you will set the override date to this:

  • Go to Admin and open System Configuration
  • Set the Effective Date field to 23/03/2009
  • If you wait for a while you will see the Effective Date appear in the bottom left hand of your application – however, you don't need to wait for this; the change will be applied right away

    System Date Override (Effective Date) can be useful on development and testing installations of PhixFlow – but should never been used on a production system!

  • Run ReadCustSignUpFiles

To view the runs (Stream Sets) for the stream:

  • Hover over the stream and in the tool bar press - Show Stream Sets
  • A list of Stream Sets appears (in this case, only one!)
  • Note the To Date – this is the date/time when the stream was run (remember you updated the effective date of the system!)
  • Within each Stream Set the data for one run is stored
  • Double-click on the Stream Set; this will display the data you have just loaded, in a grid
  • This is the same data as you would see if you opened the default view; the default view just shows the data from the latest run on the stream
  • You should see that the Stream Set contains 6 records
  • Update the Effective Date to 24/03/2009 and run the Stream again; another 6 records will be loaded – this time from the file for 24/03/2009
  • Update the Effective Date to 25/03/2009 and run the Stream again; a third Stream Set will be loaded – this time from the file for 25/03/2009
  • If you are finishing this exercise at this point, unset the Effective Date

Reading from external databases using Stream Set dates

In this exercise you will create a stream to read the same data as in the previous exercise – except this time from a database table. As before, the intention is that your stream will run daily. You will use the run (Stream Set) dates each day to find the correct set of records to load.

  • Add a Database Collector, with Name Read CustSignUp Data, to the model you created in the previous exercise
  • Open the configuration form for Read CustSignUp Data
    • Go to the Details tab
    • Select the Datasource CRM (you created this during the Introduction to Modelling course – if you don't have this, use the Datasource CRM R instead)
  • Tick the flags Enabled and Allow Non-Scheduled Collection
  • Enter the Query String:

select * from SOURCE_CUST_SIGNUPS

  • Press Apply
  • In the Query String tab press - Create a new Stream using the attributes returned from the query
  • Update the Query String in the Database Collector to:

select * from SOURCE_CUST_SIGNUPS
where COLLECTION_DATE > to_date({_fromDate}, 'yyyymmdd.hh24miss')
and COLLECTION_DATE <= to_date({_toDate}, 'yyyymmdd.hh24miss')
Remember that in this query only those parts contained in {} are interpreted by PhixFlow – the rest is written in the query language of the external database you are collecting data from (in this case, Oracle)
So in this query, PhixFlow will insert _fromDate and _toDate - the start date and end date of each run (Stream Set) - into the query before running it against the database

  • Save your changes to the database collector

As in the previous exercise, to test this configuration you need to use the system date override.

  • Update the Effective Date to 23/03/2009 and run the Stream; a Stream Set of 6 records will be loaded – for the period 01/01/2000 (the start date of the Stream) – 23/03/2009
  • Update the Effective Date to 24/03/2009 and run the Stream again; another Stream Set of 6 records will be loaded – this time for the period 23/03/2009 - 24/03/2009
  • Update the Effective Date to 25/03/2009 and run the Stream again; a third Stream Set will be loaded – this time for the period 24/03/2009 – 25/03/2009

If, for some reason, this process doesn't run on a certain day – it will automatically catch up the next time it is run. If, for example, the process was:

  • Run on 23/3/2009
  • Not run on 24/3/2009
  • Run on 25/3/2009

On 25/3/2009 the run would be from the time the stream last run (23/3/2009) up to the current date (25/3/2009). Since you are passing the start and end dates for your run up to the query, on 25/3/2009 the query would be:
select * from SOURCE_CUST_SIGNUPS
where COLLECTION_DATE > to_date('20090323.123208 ', 'yyyymmdd.hh24miss')
and COLLECTION_DATE <= to_date('20090325.124626', 'yyyymmdd.hh24miss')
and so the process would catch up and load 2 days of data in one go

  • If you are finishing this exercise at this point, unset the Effective Date

Reading Sequenced Files

In this exercise you will create a Stream to read files that are generated with a sequence number. As in the previous exercise, these contain details of customer signups, by region.
These files can be found in the input files, in the directory:
…\inputData\CustomerSignUpsSeq\input
The file names have the format custSignUp-N.txt where N is a sequence number.
Add a file collector to read in a copy of these files on the PhixFlow server:

  • Add a File Collector, with Name Read CustSignUp Files Seq, to the model you created in exercise ï‚·
  • Open the configuration form for Read CustSignUp Files Seq
  • In the Details tab:
    • Tick the Enabled flag
    • Leave the Number of Header Lines as 1
    • Leave the flag Allow Non-Scheduled Collector ticked
    • In the field Input Directory Expr. enter the value: "CustomerSignUpsSeq/input"
    • In the field File Pattern Expression enter the value: ".*"
    • Press - Apply at the top of the File Collector form
  • Move to the File Columns tab
    • Press - Create the file attributes automatically from the header row in the file
  • In the banner of the File Collector details form, press - Create a new Stream using the File Collector attributes
  • Save your changes to the file collector
  • Add a sequence:
    • Find the button - Show the list of Sequences in the left-hand menu bar
    • Press to create a new sequence
    • Enter the following details into the sequence configuration form:
      • Name: SignUpSeq
      • Start Value: 1
      • Press OK to save your changes
  • Update the file collector to use the next sequence number:
    • Open the File Collector configuration form
    • Update the File Pattern Expression to "custSignUp-" + nextValue("SignUpSeq") + ".txt"
    • Press OK to save your changes
    • Run Customer SignUps Seq
    • On the first run (in the first Stream Set) you will see 6 records load
    • In the Console, double-click on the analysis run in the Completed Tasks section, and then at the bottom of the Console open the Imorted/Exported Files tab – you can see the name of the file you have just read in
    • Run the stream two more times to load in the files with the next two sequence numbers
  • If you are finishing this exercise at this point, unset the Effective Date

Use input multiplier to select records since last read

In this exercise you will create a similar to configuration to that in exercise 2.2, except that in this case you will read data from the latest date of the previous read up to the latest date.

  • Add a Database Collector, with Name Read CustSignUp Data 2, to the model you created in the previous exercise
  • Open the configuration form for Read CustSignUp Data 2
    • Go to the Details tab
    • Select the Datasource CRM (you created this during the Introduction to Modelling course – if you don't have this, use the Datasource CRM R instead)
  • Tick the flags Enabled and Allow Non-Scheduled Collection
  • Enter the Query String:

select * from SOURCE_CUST_SIGNUPS

  • Press Apply
  • In the Query String tab press - Create a new Stream using the attributes returned from the query
  • Update the Query String in the Database Collector to:

select * from SOURCE_CUST_SIGNUPS
where COLLECTION_DATE > {_inputMultiplier}
and COLLECTION_DATE <= to_date({_toDate}, 'yyyymmdd.hh24miss')
Remember that in this query only those parts contained in {} are interpreted by PhixFlow – the rest is written in the query language of the external database you are collecting data from.
So in this query, PhixFlow will insert _inputMultiplier and _toDate into the query before running it against the database

  • Save your changes to the database collector
  • Update the output stream to set the input multiplier:
    • Add a 'loop' pipe to the stream ReadCustSignUpData2:
      • Start a pipe on the stream
      • Move away from the stream, then back on to it – then press select

A loop pipe linking the stream to itself will appear

      • In the new pipe pop-up form:
        • Set Name to Prev
        • Set Type to Lookup
        • Set Data to Read to Previous
        • Press OK
      • In the pipe configuration form:
        • Go to the Aggregate Attributes tab
        • Drag the attribute COLLECTION_DATE into the list of aggregate attributes

The default aggregate function of Maximum is already applied – this is what you want: this stream will return the maximum collection date from the previous run

      • Press OK to save your changes to the pipe
    • Open the configuration for the stream ReadCustSignUpData2:
      • Go to the Advanced tab
      • Enter the following expression into the Input Multiplier field: ifNull(prev.COLLECTION_DATE, toDate("19710101.000000"))

You need the ifNull condition because the first time the stream is run there will be no previous run – so in this case we set the date to be something before the date of any records in the table

    • Press OK to save your changes to the stream

As in the previous exercise, to test this configuration you need to use the system date override.

  • Update the Effective Date to 23/03/2009 and run the Stream; a Stream Set of 6 records will be loaded – for the period 01/01/2000 (the start date of the Stream) – 23/03/2009
  • Update the Effective Date to 24/03/2009 and run the Stream again; another Stream Set of 6 records will be loaded – this time for the period 23/03/2009 - 24/03/2009
  • Update the Effective Date to 25/03/2009 and run the Stream again; a third Stream Set will be loaded – this time for the period 24/03/2009 – 25/03/2009

    This is an important configuration – although in this example the results are the same in exercise 2.2, in some cases you cannot guarantee that the key date of your input data will be lined up with your PhixFlow run (stream set) dates. Since this configuration is based on reading from the latest date of the previous read, the dates used are entirely based on the input data and not on the PhixFlow run dates.
    Note that in practice you generally would not need to add the configuration
    <= …{_toDate}…
    because you would simply read everything after the previous read date – taking all the data available in the table after that point. So in the example your query would be:
    select * from SOURCE_CUST_SIGNUPS
    where COLLECTION_DATE > {_inputMultiplier}

  • Unset the Effective Date

Please let us know if we could improve this page feedback@phixflow.com