__STYLES__

Product Demand Prediction for an Oil Company

Tools used in this project
Product Demand Prediction for an Oil Company

About this project

Background

“Most companies have an IT organization, but they haven’t thought of the possibilities of decoupling the 'I' from the 'T' and managing information and technology as separate assets.” — Doug Laney

The oil and gas industry is constantly faced with logistical and other operational challenges. Despite having solutions such as sensors, and predictive maintenance that help detect faults in pipelines, or inefficiencies in the transport systems, problems that relate to demand and scarcity predictions are insufficiently addressed. These challenges can however be addressed by harnessing the power of Big Data analytics and Business Intelligence.

Globally, just about 36% of oil and gas companies have invested in Big Data analytics, and only 13% utilize the concepts as enhanced business intelligence solutions. These stats show that the majority of companies do not entirely embed Big Data tech into their systems and business processes.

A client in the oil and gas industry acknowledged the imperativeness of Big Data analytics and requested a system that could predict scarcity and product demand. The client needed to optimize its supply chain and understand the consumption patterns of its various affiliate stations. In essence, the business needed to make its operations efficient and ensure that the stations had a sufficient supply at any given time.

Project's Objectives

  • Deliver product demand and scarcity forecasting dashboards.
  • Help the client understand and effectively make sense of their data.

Tech Stack

  • Azure Data Factory.
  • Azure Databricks.
  • Azure Synapse Analytics.
  • Azure Blob Storage.
  • Microsoft PowerBI

Solution

The Product Demand use case followed the bronze–gold architecture. The process begins with data loading into the Data Lake from different sources to the landing path, converted to delta tables at the bronze level, transformed on the silver level, building of feature store and modeling on the gold level, and finally aggregation and visualization on synapse and Power BI respectively.

Landing Path

The client presented the data in various sources and formats, I then converted the files to a common format (CSV) and loaded the files into the datalake. The datalake was designed over Azure Blob Storage.

Landing to Bronze

Once I had uploaded all the files into the datalake, we ran a Python script that read the CSV, converted them into delta tables, and wrote the data on the bronze path. At this stage, no transformation occurred and data was loaded as is.

# define paths 
container_var = f"wasbs://{container_name}@{storage_account_name}/"
bronze_path = f"{container_var}bronze/product_demand/###/"

----command----

# load landing paths 
##_df_arr = []
dates = generate_date(start_date , end_date,"%Y%m%d")
for day in dates:
    landing_path = f"{container_var}landing/product_demand/##/{day}/##.csv"
    try:
        each_day_df = (spark.read.format('csv')
                            .option('header', True)
                            .option("inferSchema", "true")
                            .option("sep", ",")   
                            .load(landing_path)
                            .withColumn("FILE_NAME", input_file_name()))
        ##_df_arr.append(each_day_df)
    except:
        print(f"The file {landing_path} does not exist")

Bronze to Silver

On the silver path, we did a number of transformations based on the business processes and requirements. These transformations included the removal of unwanted columns, filtering out non-affiliate stations, and calculating various metrics such as delayed deliveries, scarcity, and various station volumes. Eventually, we developed a feature store and wrote the data on the gold level.

# define paths 
container_var = f"wasbs://{container_name}@{storage_account_name}/"
bronze_path = f"{container_var}bronze/product_demand/##/"
silver_path = f"{container_var}silver/product_demand/##/"

----command----
#Station with highest volume sold 
daily_volume_product_sold_df = (##_df.groupBy("STATION","METER_DESCRIPTION","TYPE","SALES_DATE")
                                          .agg(  fsum("SALES_VALUE").alias("TOTAL_DAILY_SALES_VALUE")
                                                ,fsum("VOLUME").alias("TOTAL_DAILY_VOLUME_PRODUCT_SOLD")
                                               )                                     
                                                                                 
                                          )

Gold Path

Data written on the gold path is considered as the feature store. The feature store contains different metrics that we extracted from the data. There were additional columns that the data speaks to. For instance, columns showing product delay dates, delivery duration, station volumes, and tank capacities, all point to the operational efficiency of each station. We did modeling on the data from the feature to determine different forecasted metrics such as sales volumes, delivery duration, and scarcity among others as presented on the forecasting dashboard. We used Facebook Prophet which is a library from Sckit Learn which is specifically used to make such forecasting.

loaded_stations_df = []
for station in range(stations.shape[0]):
    try:
        sales_df = ##_raw_df.select('ds','y').where(f"STATION = '{stations['STATION'].iloc[station]}'")
        my_model = Prophet(interval_width=0.95)
        # We convert to pandas because prophet only works with pandas dataframe
        pandas_sales_df = sales_df.toPandas()
        my_model.fit(pandas_sales_df)
        future_dates = my_model.make_future_dataframe(periods=7, freq='D',include_history=True)
        forecast = my_model.predict(future_dates)
        final_forecast = forecast[['ds', 'yhat']].copy()
        final_forecast['STATION'] = stations['STATION'].iloc[station]
        final_forecast_spark = spark.createDataFrame(final_forecast)
        final_forecast_spark = final_forecast_spark.join(sales_df,['ds'])
        loaded_stations_df.append(final_forecast_spark)
    except Exception as e:
        print(f"An error occured: {e}")

The forecasted data was then written to Azure Synapse Analytics, where I did a further analysis in SQL based on the metrics, stored them as views, and used the virtual tables to design the dashboard.

Benefits of the Solution

  • The business identified mismatches and discrepancies in its data,
  • The client was able to pinpoint shortcomings and optimize their supply chain processes for better customer service.
  • The business is now able to determine the consumption patterns of various affiliate stations and predict product shortages.

Additional project images

Discussion and feedback(0 comments)
2000 characters remaining
Cookie SettingsWe use cookies to enhance your experience, analyze site traffic and deliver personalized content. Read our Privacy Policy.