Skip to main content

Products

Products service exposes API operations for product data.

Code Examples

Python SDK

Link: https://pypi.org/project/glassdome-waypoint-sdk/

"""
Example: batch create, update and delete products.

Usage:
export WAYPOINT_BASE_URL="https://waypoint.glassdome.dev"
export WAYPOINT_API_KEY="your-api-key"
python products.py
"""

import os

from glassdome_waypoint_sdk import (
AnyRegistry,
WaypointClient,
WaypointConfig,
)
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
from glassdome_waypoint_sdk.v1beta1.types.operation import OperationReturnOptions
from glassdome_waypoint_sdk.v1beta1.types.product import (
CreateProductRequest,
DeleteProductRequest,
Product,
Status,
UpdateProductRequest,
)


def main():
base_url = os.getenv("WAYPOINT_BASE_URL") or exit("WAYPOINT_BASE_URL is not set")
api_key = os.getenv("WAYPOINT_API_KEY") or exit("WAYPOINT_API_KEY is not set")

client = WaypointClient.from_api_key(WaypointConfig(base_url=base_url), api_key)

print("Creating products...")
# Create products
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
)

# Wait for completion
op.wait()

# Check the error of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))

print("Creating products again...")
# Create the same products again and wait for completion at the same time
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
).wait()

# Check the error of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))

print("Updating products...")
# Update products and wait for completion with response
op = client.product.update_products(
requests=[
UpdateProductRequest(
product=Product(
id="prod-001",
name="pname001-updated",
sku="sku001-updated",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_GRAM,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
UpdateProductRequest(
product=Product(
id="prod-002",
name="pname002-updated",
sku="sku002-updated",
status=Status.STATUS_INACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_PIECE,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
]
).wait(return_options=OperationReturnOptions(response=True))

# Check the result of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))
elif resp := op.response():
for part in resp.parts:
print(AnyRegistry.unpack(part.data))

print("Deleting products...")
# Delete products with partial success and wait for completion
# The operation will be successful even if some products are not found
op = client.product.delete_products(
requests=[
DeleteProductRequest(id="prod-001"),
DeleteProductRequest(id="prod-002"),
DeleteProductRequest(id="prod-003"),
],
allow_partial_success=True,
).wait()

# Check the result of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))


if __name__ == "__main__":
main()

Airflow Provider

Link: https://pypi.org/project/airflow-providers-glassdome-waypoint/

"""
Example: batch create, update and delete products.

This example assumes that the Glassdome Waypoint connection has already been created
with the connection ID "glassdome_waypoint_default".

You can create a connection with a different connection ID and use it for the hook.
"""

from datetime import timedelta

from airflow.sdk import dag, task


@task
def manage_products():
import logging

from glassdome_waypoint.hooks.waypoint import WaypointHook
from glassdome_waypoint_sdk import (
AnyRegistry,
)
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
from glassdome_waypoint_sdk.v1beta1.types.operation import OperationReturnOptions
from glassdome_waypoint_sdk.v1beta1.types.product import (
CreateProductRequest,
DeleteProductRequest,
Product,
Status,
UpdateProductRequest,
)

client = WaypointHook("glassdome_waypoint_default").get_client()

logging.info("Creating products...")
# Create products
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
)

# Wait for completion
op.wait()

# Check the error of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))

logging.info("Creating products again...")
# Create the same products again and wait for completion at the same time
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
).wait()

# Check the error of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))

logging.info("Updating products...")
# Update products and wait for completion with response
op = client.product.update_products(
requests=[
UpdateProductRequest(
product=Product(
id="prod-001",
name="pname001-updated",
sku="sku001-updated",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_GRAM,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
UpdateProductRequest(
product=Product(
id="prod-002",
name="pname002-updated",
sku="sku002-updated",
status=Status.STATUS_INACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_PIECE,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
]
).wait(return_options=OperationReturnOptions(response=True))

# Check the result of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))
elif resp := op.response():
for part in resp.parts:
logging.info(AnyRegistry.unpack(part.data))

logging.info("Deleting products...")
# Delete products with partial success and wait for completion
# The operation will be successful even if some products are not found
op = client.product.delete_products(
requests=[
DeleteProductRequest(id="prod-001"),
DeleteProductRequest(id="prod-002"),
DeleteProductRequest(id="prod-003"),
],
allow_partial_success=True,
).wait()

# Check the result of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))


@dag(
dag_id="example_products",
description="Manage products (create, update and delete)",
schedule=None,
dagrun_timeout=timedelta(minutes=1),
tags=["examples"],
default_args={
"owner": "Glassdome",
"retries": 0,
},
)
def example_products():
manage_products()


dag = example_products()

if __name__ == "__main__":
dag.test()