PCF
PCF service provides API operations for Product Carbon Footprint data.
📄️ InsertFacts
Inserts facts in batch as a long-running operation.
📄️ UpdateFlows
Creates or updates flows in batch as a long-running operation.
📄️ UpdateLinks
Creates or updates links in batch as a long-running operation.
📄️ UpdateUnitProcesses
Creates or updates unit processes in batch as a long-running operation.
Code Examples
Python SDK
Link: https://pypi.org/project/glassdome-waypoint-sdk/
"""
Example: run the PCF pipeline - upsert master data and insert facts.
Usage:
export WAYPOINT_BASE_URL="https://waypoint.glassdome.dev"
export WAYPOINT_API_KEY="your-api-key"
export WAYPOINT_SITE_ID="your-site-id"
python pcf.py
"""
import os
import glassdome_waypoint_sdk.v1beta1.types.pcf as pcf
import glassdome_waypoint_sdk.v1beta1.types.product as product
from glassdome_waypoint_sdk import WaypointClient, WaypointConfig
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
class PCFPipeline:
def __init__(self, client: WaypointClient, site_id: str):
self._client = client
self._site_id = site_id
# You may want to connect the source of the data such as CSV, MES, ERP, etc.
# self._mes = mes
# You may want to manage the master data in separate modules
# such as products.py, unit_processes.py, flows.py, etc.
self._prod_a_id = "prod-a"
self._up_a_id = "up-a"
self._up_b_id = "up-b"
self._flow_raw_material_id = "flow-raw-material"
self._flow_electricity_id = "flow-electricity"
self._flow_waste_id = "flow-waste"
self._flow_semi_product_id = "flow-semi-product"
self._flow_ancillary_material_id = "flow-ancillary-material"
def run(self):
self._upsert_master_data()
self._insert_facts()
def _upsert_master_data(self):
print("Upserting products...")
# Upsert products
upsert_products_op = self._client.product.update_products(
[
product.UpdateProductRequest(
product=product.Product(
id=self._prod_a_id,
name="Product A",
sku="sku-a",
status=product.Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
site_ids=[self._site_id],
),
update_mask=FieldMask(
paths=["name", "sku", "status", "uom", "site_ids"]
),
allow_missing=True,
),
]
)
print("Upserting unit processes...")
# Upsert unit processes
upsert_unit_processes_op = self._client.pcf.update_unit_processes(
[
pcf.UpdateUnitProcessRequest(
unit_process=pcf.UnitProcess(
id=self._up_a_id,
site_id=self._site_id,
name="Process A",
),
update_mask=FieldMask(paths=["name"]),
allow_missing=True,
),
pcf.UpdateUnitProcessRequest(
unit_process=pcf.UnitProcess(
id=self._up_b_id,
site_id=self._site_id,
name="Process B",
),
update_mask=FieldMask(paths=["name"]),
allow_missing=True,
),
]
)
print("Upserting flows...")
# Upsert flows
upsert_flows_op = self._client.pcf.update_flows(
[
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=self._flow_raw_material_id,
site_id=self._site_id,
name="Raw Material",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_RAW_MATERIAL,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=self._flow_electricity_id,
site_id=self._site_id,
name="Electricity",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOWATT_HOUR,
in_category=pcf.InCategory.IN_CATEGORY_ENERGY,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=self._flow_waste_id,
site_id=self._site_id,
name="Waste",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
out_category=pcf.OutCategory.OUT_CATEGORY_WASTE,
),
update_mask=FieldMask(paths=["name", "uom", "out_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=self._flow_semi_product_id,
site_id=self._site_id,
name="Semi Product",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_RAW_MATERIAL,
out_category=pcf.OutCategory.OUT_CATEGORY_SEMI_PRODUCT,
),
update_mask=FieldMask(
paths=["name", "uom", "in_category", "out_category"]
),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=self._flow_ancillary_material_id,
site_id=self._site_id,
name="Ancillary Material",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_ANCILLARY_MATERIAL,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
]
)
# Products, unit processes, and flows don't depend on each other,
# so they can be updated concurrently.
upsert_products_op.wait()
upsert_unit_processes_op.wait()
upsert_flows_op.wait()
if err := upsert_products_op.error():
raise Exception(
f"Products upsert failed: Error code: {err.code}, message: {err.message}"
)
if err := upsert_unit_processes_op.error():
raise Exception(
f"Unit processes upsert failed: Error code: {err.code}, message: {err.message}"
)
if err := upsert_flows_op.error():
raise Exception(
f"Flows upsert failed: Error code: {err.code}, message: {err.message}"
)
print("Upserting links...")
# Upsert links and wait for completion
upsert_links_op = self._client.pcf.update_links(
[
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_a_id,
component_id=self._flow_raw_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_a_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_a_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_a_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_b_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_b_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_b_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_b_id,
component_id=self._flow_ancillary_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=self._up_b_id,
component_id=self._prod_a_id,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
]
).wait()
if err := upsert_links_op.error():
raise Exception(
f"Links upsert failed: Error code: {err.code}, message: {err.message}"
)
def _build_facts(self) -> list[pcf.Fact]:
# Assume the facts are built from the sources of the data
# connected to the pipeline
print("Building facts...")
return [
# 202501
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_raw_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=10,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=90,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=50,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=5,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=90,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_ancillary_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=80,
),
pcf.Fact(
month="202501",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._prod_a_id,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=85,
),
# 202502
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_raw_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=200,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=200,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=20,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_a_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=180,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_electricity_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_waste_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=10,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_semi_product_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=180,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._flow_ancillary_material_id,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=160,
),
pcf.Fact(
month="202502",
site_id=self._site_id,
unit_process_id=self._up_b_id,
component_id=self._prod_a_id,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=170,
),
]
def _insert_facts(self):
facts = self._build_facts()
print("Inserting facts...")
# Insert facts
op = self._client.pcf.insert_facts(facts).wait()
if err := op.error():
raise Exception(
f"Facts insert failed: Error code: {err.code}, message: {err.message}"
)
def main():
base_url = os.getenv("WAYPOINT_BASE_URL") or exit("WAYPOINT_BASE_URL is not set")
api_key = os.getenv("WAYPOINT_API_KEY") or exit("WAYPOINT_API_KEY is not set")
site_id = os.getenv("WAYPOINT_SITE_ID") or exit("WAYPOINT_SITE_ID is not set")
client = WaypointClient.from_api_key(WaypointConfig(base_url=base_url), api_key)
PCFPipeline(client, site_id).run()
if __name__ == "__main__":
main()
Airflow Provider
Link: https://pypi.org/project/airflow-providers-glassdome-waypoint/
"""
Example: run the PCF pipeline - upsert master data, build facts, and insert facts.
This example assumes that the Glassdome Waypoint connection has already been created
with the connection ID "glassdome_waypoint_default".
You can create a connection with a different connection ID and use it for the hook.
This example also assumes that the PCF site ID has been set as an Airflow variable.
"""
import logging
from datetime import timedelta
from enum import Enum
from typing import Any
import glassdome_waypoint_sdk.v1beta1.types.pcf as pcf
import glassdome_waypoint_sdk.v1beta1.types.product as product
from airflow.exceptions import AirflowFailException
from airflow.sdk import dag, task
from glassdome_waypoint.hooks.waypoint import WaypointHook
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
# You may want to manage the master data in separate modules
# such as products.py, unit_processes.py, flows.py, etc.
class ID(Enum):
PRODUCT_A = "prod-a"
UNIT_PROCESS_A = "up-a"
UNIT_PROCESS_B = "up-b"
FLOW_RAW_MATERIAL = "flow-raw-material"
FLOW_ELECTRICITY = "flow-electricity"
FLOW_WASTE = "flow-waste"
FLOW_SEMI_PRODUCT = "flow-semi-product"
FLOW_ANCILLARY_MATERIAL = "flow-ancillary-material"
@task
def upsert_master_data(conn_id: str, site_id: str):
client = WaypointHook(conn_id).get_client()
logging.info("Upserting products...")
# Upsert products
upsert_products_op = client.product.update_products(
[
product.UpdateProductRequest(
product=product.Product(
id=ID.PRODUCT_A.value,
name="Product A",
sku="sku-a",
status=product.Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
site_ids=[site_id],
),
update_mask=FieldMask(
paths=["name", "sku", "status", "uom", "site_ids"]
),
allow_missing=True,
),
]
)
logging.info("Upserting unit processes...")
# Upsert unit processes
upsert_unit_processes_op = client.pcf.update_unit_processes(
[
pcf.UpdateUnitProcessRequest(
unit_process=pcf.UnitProcess(
id=ID.UNIT_PROCESS_A.value,
site_id=site_id,
name="Process A",
),
update_mask=FieldMask(paths=["name"]),
allow_missing=True,
),
pcf.UpdateUnitProcessRequest(
unit_process=pcf.UnitProcess(
id=ID.UNIT_PROCESS_B.value,
site_id=site_id,
name="Process B",
),
update_mask=FieldMask(paths=["name"]),
allow_missing=True,
),
]
)
logging.info("Upserting flows...")
# Upsert flows
upsert_flows_op = client.pcf.update_flows(
[
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=ID.FLOW_RAW_MATERIAL.value,
site_id=site_id,
name="Raw Material",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_RAW_MATERIAL,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=ID.FLOW_ELECTRICITY.value,
site_id=site_id,
name="Electricity",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOWATT_HOUR,
in_category=pcf.InCategory.IN_CATEGORY_ENERGY,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=ID.FLOW_WASTE.value,
site_id=site_id,
name="Waste",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
out_category=pcf.OutCategory.OUT_CATEGORY_WASTE,
),
update_mask=FieldMask(paths=["name", "uom", "out_category"]),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=ID.FLOW_SEMI_PRODUCT.value,
site_id=site_id,
name="Semi Product",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_RAW_MATERIAL,
out_category=pcf.OutCategory.OUT_CATEGORY_SEMI_PRODUCT,
),
update_mask=FieldMask(
paths=["name", "uom", "in_category", "out_category"]
),
allow_missing=True,
),
pcf.UpdateFlowRequest(
flow=pcf.Flow(
id=ID.FLOW_ANCILLARY_MATERIAL.value,
site_id=site_id,
name="Ancillary Material",
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
in_category=pcf.InCategory.IN_CATEGORY_ANCILLARY_MATERIAL,
),
update_mask=FieldMask(paths=["name", "uom", "in_category"]),
allow_missing=True,
),
]
)
# Products, unit processes, and flows don't depend on each other,
# so they can be updated concurrently.
upsert_products_op.wait()
upsert_unit_processes_op.wait()
upsert_flows_op.wait()
if err := upsert_products_op.error():
raise AirflowFailException(
f"Products upsert failed: Error code: {err.code}, message: {err.message}"
)
if err := upsert_unit_processes_op.error():
raise AirflowFailException(
f"Unit processes upsert failed: Error code: {err.code}, message: {err.message}"
)
if err := upsert_flows_op.error():
raise AirflowFailException(
f"Flows upsert failed: Error code: {err.code}, message: {err.message}"
)
logging.info("Upserting links...")
# Upsert links and wait for completion
upsert_links_op = client.pcf.update_links(
[
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_RAW_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ANCILLARY_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
pcf.UpdateLinkRequest(
link=pcf.Link(
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.PRODUCT_A.value,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
),
update_mask=FieldMask(
paths=[
"unit_process_id",
"component_id",
"component_kind",
"direction",
]
),
allow_missing=True,
),
]
).wait()
if err := upsert_links_op.error():
raise AirflowFailException(
f"Links upsert failed: Error code: {err.code}, message: {err.message}"
)
@task
def build_facts(site_id: str) -> list[dict[str, Any]]:
# Assume the facts are built from the sources of the data connected to the pipeline.
from google.protobuf.json_format import MessageToDict
logging.info("Building facts...")
facts = [
# 202501
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_RAW_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=10,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=90,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=50,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=5,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=90,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ANCILLARY_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=80,
),
pcf.Fact(
month="202501",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.PRODUCT_A.value,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=85,
),
# 202502
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_RAW_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=200,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=200,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=20,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_A.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=180,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ELECTRICITY.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=100,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_WASTE.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=10,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_SEMI_PRODUCT.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=180,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.FLOW_ANCILLARY_MATERIAL.value,
component_kind=pcf.Component.COMPONENT_FLOW,
direction=pcf.Direction.DIRECTION_INPUT,
amount=160,
),
pcf.Fact(
month="202502",
site_id=site_id,
unit_process_id=ID.UNIT_PROCESS_B.value,
component_id=ID.PRODUCT_A.value,
component_kind=pcf.Component.COMPONENT_PRODUCT,
direction=pcf.Direction.DIRECTION_OUTPUT,
amount=170,
),
]
# Convert the facts to dicts for the XCom serialization
return [MessageToDict(fact) for fact in facts]
@task
def insert_facts(conn_id: str, **context):
from google.protobuf.json_format import ParseDict
client = WaypointHook(conn_id).get_client()
# Pull the facts from the build_facts task result
xcom_facts: list[dict[str, Any]] = context["task_instance"].xcom_pull(
task_ids="build_facts"
)
# Convert the dicts back to protobuf messages
facts = [ParseDict(xcom_fact, pcf.Fact()) for xcom_fact in xcom_facts]
logging.info("Inserting facts...")
# Insert facts and wait for completion
op = client.pcf.insert_facts(facts).wait()
if err := op.error():
raise AirflowFailException(
f"Facts insert failed: Error code: {err.code}, message: {err.message}"
)
@dag(
dag_id="example_pcf",
description="PCF pipeline - upsert master data, build facts, and insert facts",
schedule=None,
dagrun_timeout=timedelta(minutes=1),
tags=["examples"],
default_args={
"owner": "Glassdome",
"retries": 10,
"retry_delay": timedelta(minutes=1),
},
)
def example_pcf():
from airflow.sdk import Variable
conn_id = "glassdome_waypoint_default"
site_id = Variable.get("pcf_site_id")
task_upsert_master_data = upsert_master_data(conn_id, site_id)
# You may want to connect the source of the data such as CSV, MES, and ERP
# and build the facts from the data.
task_build_facts = build_facts(site_id)
task_insert_facts = insert_facts(conn_id)
# Define the task dependencies.
# task_upsert_master_data and task_build_facts can run concurrently,
# and both of them must run before task_insert_facts.
[task_upsert_master_data, task_build_facts] >> task_insert_facts
dag = example_pcf()
if __name__ == "__main__":
dag.test()