I have also had this issue before with both the SQL dataset and the API dataset. I have a working solution based on this example (Finishing our SG API Pipeline with Chronocoding - How I Write Pipes Part V - YouTube)
So here is my example where I download SQL data in business day increments. It is not really the “kedro” way of doing things since I am reading data without using the data catalog…
My data catalog looks like this where the last_download_date is a TextDataSet that reads the CHECKPOINT file created by the IncrementalDataset. Hint: You will need to manually create this CHECKPOINT with your desired start date before your first run.
last_download_date:
type: text.TextDataSet
filepath: data/01_raw/CHECKPOINT
layer: raw
# ――――――――― incremental ―――――――――
incremental_sql:
type: IncrementalDataSet
path: data/01_raw/eclipse
filename_suffix: ".csv"
dataset: pandas.CSVDataset
layer: raw
Then in my parameters.yml (simply because I dont want it clouding my code:), I toss in my sql string where the date I want to query for will be inserted where “insert_date_here” is written. These are normally the start and end dates of your query.
sql_string:
SET NOCOUNT ON; SELECT VALUE_TIME,VALUE FROM ts_read('/folder/Value,'insert_date_here','insert_date_here','') ORDER BY VALUE_TIME
Then, in my data engineering pipeline, I add the following nodes:
def find_download_dates(last_download_date: str) -> pd.bdate_range:
"""Based on the last day in the CHECKPOINT file,
creates a list of all business days until yesterday to download in %Y-%m-%d format.
Inputs (str):
last_download_date: Last download. date
Returns (pd.bdate_range):
download_dates: List of dates to download
"""
first_download_date = pd.to_datetime(last_download_date) + pd.Timedelta(days=1)
yesterday = datetime.now() + pd.Timedelta(days=-1)
download_dates = pd.bdate_range(first_download_date, yesterday, freq="d")
download_dates = download_dates.strftime("%Y-%m-%d")
return download_dates
def collect_sql_incrementally(download_dates: pd.date_range, params) -> Dict[str, pd.DataFrame]:
"""Create incremental SQL dataset. Downloads all data from last download date to yesterday from SQL database.
Inputs (pd.bdate_range):
download_dates: List of dates to download
Returns (Dict):
Newly downloaded data added to IncrementalDataSet.
Link:
https://www.youtube.com/watch?v=v7JSSiYgqpg
"""
conf_paths = ["conf/local"]
conf_loader = ConfigLoader(conf_paths)
con = conf_loader.get("credentials*")["sql_connection"]["con"]
sql_template = params["sql_string"]
parts = {}
pbar = ProgressBar()
for download_date in pbar(download_dates):
sql_string = sql_template.replace("insert_date_here", download_date)
df = pd.read_sql(sql_string, con, parse_dates=["VALUE_TIME"])
if len(df.index) > 0:
parts[f"{download_date}"] = df
return parts
And then my pipeline looks like this:
node(
func=find_download_dates,
inputs="last_download_date",
outputs="download_dates",
),
node(
func=collect_sql_incrementally,
inputs=["download_dates", "parameters"],
outputs="incremental_sql",
),
As you can see, the un-kedro thing here is using pd.read_sql inside a node but I havent found anything better so far. I have often thought about looking into a custom dataset for incremental SQL and API queries where one only need to provide an additional start/ end date parameter and a granularity parameter that would define the chunksize of each increment. However what I quickly noticed is that APIs/SQL string vary far too much. Sometimes you use params “start/end” and other times you have “from/to”. If anyone has any ideas of how to get around this and how an Incremental SQL/ API dataset could be created, I would love to hear your input!
Hope this helps!