We have a new community! Please visit support.timextender.com
Follow

Query ODX Parquet files with Azure Synapse Workspace

Making use of the ODX data, which now uses Parquet file format, can be accomplished by querying your ADLS Gen2 storage with SSMS. Once created and connected, querying the files in your data lake, is a great way to review, inspect, and query data in an ad hoc queries. Using the extreme performance of the new Azure Synapse Workspace, SQL on-demand is a powerful tool that allows you to connect your Synapse Workspace to Azure Data Lake.

In this article you will learn about:

 

ODX Parquet File Structure and Incremental Loads

Version 20.10 brought many new capabilities and changes to the way the Operational Data Exchange stores and manages data. The most significant of these changes is “The ODX Never Forgets”, meaning that, by default, data is no longer truncated from ODX storage. Instead, new full loads are simply appended to the existing data in storage. While the ODX will automatically handle pulling only the most recent version of data into the data warehouse, this can create some challenges when querying from the ODX Storage directly.

To help with this, the ODX now identifies data in a table by batch and version.

  • A batch contains only the rows for a table processed during a single execution.
  • A version consists of multiple batches, the initial full load (i.e. Batch 0000), as well as all subsequent incremental loads (i.e. Batches 0001-9999).

When using Azure Data Lake storage, the ODX creates a unique file structure that can be used to determine the most recent version of data. Starting at the Container level, the file structure for a single table may appear as follows:

  • <DataSource>
    • <Schema_Table>
      • DATA_<2020_10_25…> VERSION create date
        • DATA
          • DATA_0000.parquet Initial full load BATCH
          • DATA_0001.parquet Subsequent incremental batch
          • DATA_0004.parquet Subsequent incremental batch (batch numbers do not always follow an exact increment.)
      • DATA_<2020_10_27…> New Version created due to schema drift or manually executed full load
        • DATA
          • DATA_0000.paquet the same pattern repeats as above…

Using this file/folder structure we can conclude that each version folder will include ALL rows that the ODX has extracted until the next version is created. So to query the most recent version of data extracted by the ODX in Data Lake, you can just look into the latest VERSION folder. The steps below will guide you through this process using Synapse Workspace SQL On-Demand.

    •  

 

Prerequisites

  1. Create a Synapse Workspace (MSFT: Create a Synapse Workspace)
    • Note: When creating your workspace, make sure to create your own Security credential, rather than the default, sqladminuser.
  2. Connect SQL Server Management Studio (SSMS) to your workspace (MSFT: Connect Synapse to SSMS)
    • Note: When connecting SSMS to your Synapse workspace, make sure to use the "SQL on-demand endpoint, rather than the SQL endpoint.

 

Query the Data Lake with Synapse Workspace and SSMS

Using a Synapse Workspace to query Parquet files is straightforward. (MSFT: Query Parquet files)

For illustration, we'll use the AdventureWorks2014 database for our query results examples, specifically the Production.Product tables.

  1. In TimeXtender, Execute a data source in the ODX (connected to your ADLS Gen2 account).
  2. In Azure Storage Explorer, verify that the data source has been executed.mceclip3.png
  3. In SSMS, we'll execute the following query to connect to our files.
    • --Query ODX Parquet files (returns all records)

      SELECT *
      FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/*/*/*.parquet'
      , FORMAT = 'PARQUET') AS a
      • Note: you must alias the OPENROWSET, or you'll likely receive the error message, "A correlation name must be specified for the bulk rowset in the from clause."
      • Wildcards, using the * symbol, can be used in the file path
      • StorageAccountName is the name of the ADLS Gen2 account
      • ContainerName is the target storage container
      • DataSourceShortName is the short name of the data source, as defined in the ODX
      • Schema_TableName is the data source schema and table name
  4. Upon execution, you'll notice we are able to see that we're able to run generic SQL directly onto data in the data lake.mceclip0.png

 

Using the FILEPATH function to determine ODX version and batch

  1. The ODX data sources won't include an ODX_Timestamp within the table, (as of TimeXtender 20.10.1), but the filepath actually does contain the timestamp information. We can use the file path to identify the data timestamps.
  2. The filepath function can greatly simplify how we'll extract the entire filepath string, or sections of it. To get the entire filepath, we can use the filepath function with no arguments inside the parentheses.
    • --Get the complete filepath
      a.filepath() AS [FilePath]
  3. Next, we can provide an argument to extract the first wildcard from the full filepath, which is the ODX_Version. For example, filepath(1) will return the string represented by the first instance of a wilcard
    • --Extract the date/time part of the file path
      a.filepath(1) AS [ODX_Version]
  4. Lastly, we can provide an argument to extract the first wildcard from the full filepath, which is the ODX_batch. For example, filepath(3) will return the string represented by the first instance of a wilcard
    • --Convert the parts of the date/time path to a DateTime data type
      a.filepath(3) AS [ODX_batch]
  5. We can further simplify the filepath wildcard string results by adding "DATA_" before the wildcard symbol in the openrowset function filepath string. For example, "/DATA_*/" will not include "DATA_" in results, but only the characters after it.
    • FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
      , FORMAT = 'PARQUET') AS a
  6. Putting the whole thing together would look like the following. We'll add three columns to our query result to help clarify the timestamps, FilePath, ODX_Version, and ODX_Batch.
    • --Query ODX parquet files with TimeStamps on rows. (Returns all records)

      SELECT
      --Get the Filepath (you need to add the table reference or this function will not work)
      a.filepath() AS [FilePath]

      --Extract the date/time part of the file path
      ,a.filepath(1) AS [ODX_Version]

      --Convert the parts of the date/time path to a DateTime data type
      , a.filepath(3) AS [ODX_batch]

      ,*
      FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
      , FORMAT = 'PARQUET') AS a
    • NOTE: Above you must alias the OPENROWSET, and be sure to use the same alias in the references above. 
  7. The query result is much easier to understand, but we still have duplicate records. mceclip5.png

 

Adding the WHERE clause to filter query result by version

  1. Now, to only select the most current data load, we'll add a sub-query WHERE clause.
    • WHERE a.filepath(1) = (
      SELECT MAX(a.filepath(1))
      FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
      , FORMAT = 'PARQUET'
      ) AS rows
  2. Putting it together, we can use the following query.
    • --Query and filter ODX Parquet files with TimeStamps on rows. (Returns only latest records)
      SELECT
      --Get the Filepath (you need to add the table reference or this function will not work)
      a.filepath() AS [FilePath]

      --Extract the date/time part of the file path
      ,a.filepath(1) AS [ODX_Version]

      --Convert the parts of the date/time path to a DateTime data type
      , a.filepath(3) AS [ODX_batch]

      ,*
      FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
      , FORMAT = 'PARQUET') AS rows

      WHERE a.filepath(1) = (
      SELECT MAX(a.filepath(1))
      FROM OPENROWSET(
      BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
      , FORMAT = 'PARQUET'
      ) AS rows )
    • NOTE: Above you must alias the OPENROWSET, and be sure to use the same alias in the references above.
  3. Voila! With this latest query, we can see that our results are now only including the latest data loaded into the ODX. mceclip6.png

 

Troubleshooting

  • Error message in SSMS, "Incorrect Syntax near Format"
    • Ensure you are using the SQL on-demand endpoint, to connect SSMS to your Synapse Workspace. If you re-connect to your Synapse workspace, you may need to create a new query to point to the SQL on-demand endpoint.
    • mceclip0.png
    • If your login to the Synapse Workspace (from SSMS) is sqladminuser, you may try either changing the login credentials (i.e. username and password), or create a new workspace.
  • Error message in SSMS, "A correlation name must be specified for the bulk rowset in the from clause.
    • Ensure that you have aliased the OPENROWSET clause, and the alias matches throughout the query.
  • Error message in TimeXtender, "errorCode": "2200", message": "Failure happened on 'Sink' side."

 

Was this article helpful?
1 out of 1 found this helpful

8 Comments

  • 0
    Avatar
    Peter Jensen

    Looks great !
    Does this query also take into account the incremental loads having updates / deletes ?

    BR,

    Peter

  • 0
    Avatar
    Permanently deleted user

    Hi Peter,

    Great question! Yes, it does. I've added a section at the top of the article, referencing the version and batch, as it relates to incremental load.

    Best,

    Taylor

    Edited by Permanently deleted user
  • 0
    Avatar
    Peter Jensen

    Hi Taylor,

    The link you put in your answer doesn't seem to work (but I suppose it links to the top of the article where you talk about incremental).

    I know the query only takes into account the last date-folder for the table, and then reads all the parquet-tables which are in that folder (starting with ...0000 up to e.g. ...0005 - if you had 5 increments).  My question is if and how it will take into account the PK-table to handle updates / deletes ?  I don't see any reference to the PK-table...

    BR,

    Peter

  • 0
    Avatar
    David Zebrowitz

    Note that there is an issue with the Openrowset aliases in the last 2 queries of the article.  Both the main select and the subselect are specifying the alias of 'rows'.  This will not work because rows is a reserved word and both the main from and subselect cannot have the same alias.

     

    Here is a working final query:

    --Query and filter ODX Parquet files with TimeStamps on rows. (Returns only latest records)
    SELECT
    --Get the Filepath (you need to add the table reference or this function will not work)
    a.filepath() AS [FilePath]

    --Extract the date/time part of the file path
    ,a.filepath(1) AS [ODX_Version]

    --Convert the parts of the date/time path to a DateTime data type
    , a.filepath(3) AS [ODX_batch]

    ,*
    FROM OPENROWSET(
    BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
    , FORMAT = 'PARQUET') AS a

    WHERE a.filepath(1) = (
    SELECT MAX(b.filepath(1))
    FROM OPENROWSET(
    BULK 'https://<StorageAccountName>.blob.core.windows.net/<ContainerName>/<DataSourceShortName>/<Schema_TableName>/DATA_*/*/DATA_*.parquet'
    , FORMAT = 'PARQUET'
    ) AS b )

    Edited by David Zebrowitz
  • 0
    Avatar
    Rombouts, Jasper

    hi,

    we would like to do something like this from databricks. Do you have the same kind of example code to use the ODX data in databricks? if possible with selection of the last version of the record for incremental like Peter mentioned?

     

    Jasper

  • 0
    Avatar
    Peter Jensen

    Hi,
    as requested in the past - and now also by Jasper - can you provide us with the queries for :
    - keeping only the latest version of a record (incr. load) when this record appears in e.g. 5 incremental parquet files (in case we do an initial full read from the ODX)

    - handling deleted records (PK-file)

    Thanks for the feedback.

    Peter.

  • 0
    Avatar
    Xiaoqing Hu

    Hi, 

    Thanks for the examples script. However, it seems to me that it selects all records in the latest [ODX_Version] which is a.filepath(1), but does not consider [ODX_batch] that is a.filepath(3). So in the result it will return records in all batches in the latest ODX version, that may include the 'initial' records and the 'updated' records for the same PK?

    Cheers,

    Xiao

  • 0
    Avatar
    Xiaoqing Hu

    Hi, 

    Another question. When enabled deletion, then we see a folder called /PRIMARYKEYS next to /DATA. In this /PRIMARYKEYS folder we have

    • PRIMARYKEYS_0000.parquet
    • PRIMARYKEYS_0001.parquet
    • ...

    Seemingly these PK parquet files are not incremental, and the latest batch of the primary keys always contain all valid PK (without deleted PK). Can you confirm this?

    Xiao

Please sign in to leave a comment.