Incremental SQL

Hello,

I want to load a big SQL table into S3, I could use a pandas.SQLQueryDataSet and then use a IncrementalDataSet, the issue of doing this is that the initial query gets super slow because it will query the whole table, I would like to resume the query from the date where the IncrementalDataSet finished last time. Any idea on how to do this?

Thanks!

I have yet to get an incremental dataset working correctly myself. I either misunderstand it or my data does not fit the model. It seems simple enough, but everytime I try I feel like I can’t quite get it right in practice.

Something I have done in the past is generate the chunks my “hand”. I create a list of the chunks, this may be splitting the dataset by year, or some group id. Then I generate a catalog.yml from that list for each separate group, each layer it goes through, then a combined dataset. Then I generate nodes off of the same list in a for loop. Its pretty simple and straight forward for my brain. Lay it out as if you are doing it for one, then wrap it in a for group in groups

1 Like

I have also had this issue before with both the SQL dataset and the API dataset. I have a working solution based on this example (Finishing our SG API Pipeline with Chronocoding - How I Write Pipes Part V - YouTube)
So here is my example where I download SQL data in business day increments. It is not really the “kedro” way of doing things since I am reading data without using the data catalog…

My data catalog looks like this where the last_download_date is a TextDataSet that reads the CHECKPOINT file created by the IncrementalDataset. Hint: You will need to manually create this CHECKPOINT with your desired start date before your first run.

last_download_date:
  type: text.TextDataSet
  filepath: data/01_raw/CHECKPOINT
  layer: raw

# ――――――――― incremental ―――――――――  
incremental_sql:
  type: IncrementalDataSet
  path: data/01_raw/eclipse
  filename_suffix: ".csv"
  dataset: pandas.CSVDataset
  layer: raw

Then in my parameters.yml (simply because I dont want it clouding my code:), I toss in my sql string where the date I want to query for will be inserted where “insert_date_here” is written. These are normally the start and end dates of your query.

sql_string:
    SET NOCOUNT ON; SELECT VALUE_TIME,VALUE FROM ts_read('/folder/Value,'insert_date_here','insert_date_here','') ORDER BY VALUE_TIME

Then, in my data engineering pipeline, I add the following nodes:

def find_download_dates(last_download_date: str) -> pd.bdate_range:
    """Based on the last day in the CHECKPOINT file,
    creates a list of all business days until yesterday to download in %Y-%m-%d format.
        Inputs (str):
            last_download_date: Last download. date
        Returns (pd.bdate_range):
            download_dates: List of dates to download
    """
    first_download_date = pd.to_datetime(last_download_date) + pd.Timedelta(days=1)
    yesterday = datetime.now() + pd.Timedelta(days=-1)
    download_dates = pd.bdate_range(first_download_date, yesterday, freq="d")
    download_dates = download_dates.strftime("%Y-%m-%d")
    return download_dates


def collect_sql_incrementally(download_dates: pd.date_range, params) -> Dict[str, pd.DataFrame]:
    """Create incremental SQL dataset. Downloads all data from last download date to yesterday from SQL database.
        
        Inputs (pd.bdate_range):
            download_dates: List of dates to download
        Returns (Dict):
            Newly downloaded data added to IncrementalDataSet.
        Link:
            https://www.youtube.com/watch?v=v7JSSiYgqpg
    """
    conf_paths = ["conf/local"]
    conf_loader = ConfigLoader(conf_paths)
    con = conf_loader.get("credentials*")["sql_connection"]["con"]
    sql_template = params["sql_string"]

    parts = {}
    pbar = ProgressBar()
    for download_date in pbar(download_dates):
        sql_string = sql_template.replace("insert_date_here", download_date)
        df = pd.read_sql(sql_string, con, parse_dates=["VALUE_TIME"])
        if len(df.index) > 0:
            parts[f"{download_date}"] = df
    return parts

And then my pipeline looks like this:

            node(
                func=find_download_dates,
                inputs="last_download_date",
                outputs="download_dates",
            ),
            node(
                func=collect_sql_incrementally,
                inputs=["download_dates", "parameters"],
                outputs="incremental_sql",
            ),

As you can see, the un-kedro thing here is using pd.read_sql inside a node but I havent found anything better so far. I have often thought about looking into a custom dataset for incremental SQL and API queries where one only need to provide an additional start/ end date parameter and a granularity parameter that would define the chunksize of each increment. However what I quickly noticed is that APIs/SQL string vary far too much. Sometimes you use params “start/end” and other times you have “from/to”. If anyone has any ideas of how to get around this and how an Incremental SQL/ API dataset could be created, I would love to hear your input!

Hope this helps!

1 Like