Products
Products service exposes API operations for product data.
📄️ ListProducts
Lists products with pagination.
📄️ CreateProduct
Creates a new product.
📄️ GetProduct
Retrieves a single product by ID.
📄️ DeleteProduct
Delete a product.
📄️ UndeleteProduct
Undelete a product
📄️ UpdateProduct
Updates a product. Supports upsert when `allow_missing = true`.
📄️ CreateProducts
Creates products in batch as a long-running operation.
📄️ DeleteProducts
Delete Products in batch returns a long-running operation.
📄️ UndeleteProducts
Restores deleted products in batch as a long-running operation.
📄️ UpdateProducts
Updates products in batch as a long-running operation.
Code Examples
Python SDK
Link: https://pypi.org/project/glassdome-waypoint-sdk/
"""
Example: batch create, update and delete products.
Usage:
export WAYPOINT_BASE_URL="https://waypoint.glassdome.dev"
export WAYPOINT_API_KEY="your-api-key"
python products.py
"""
import os
from glassdome_waypoint_sdk import (
AnyRegistry,
WaypointClient,
WaypointConfig,
)
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
from glassdome_waypoint_sdk.v1beta1.types.operation import OperationReturnOptions
from glassdome_waypoint_sdk.v1beta1.types.product import (
CreateProductRequest,
DeleteProductRequest,
Product,
Status,
UpdateProductRequest,
)
def main():
base_url = os.getenv("WAYPOINT_BASE_URL") or exit("WAYPOINT_BASE_URL is not set")
api_key = os.getenv("WAYPOINT_API_KEY") or exit("WAYPOINT_API_KEY is not set")
client = WaypointClient.from_api_key(WaypointConfig(base_url=base_url), api_key)
print("Creating products...")
# Create products
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
)
# Wait for completion
op.wait()
# Check the error of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))
print("Creating products again...")
# Create the same products again and wait for completion at the same time
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
).wait()
# Check the error of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))
print("Updating products...")
# Update products and wait for completion with response
op = client.product.update_products(
requests=[
UpdateProductRequest(
product=Product(
id="prod-001",
name="pname001-updated",
sku="sku001-updated",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_GRAM,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
UpdateProductRequest(
product=Product(
id="prod-002",
name="pname002-updated",
sku="sku002-updated",
status=Status.STATUS_INACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_PIECE,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
]
).wait(return_options=OperationReturnOptions(response=True))
# Check the result of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))
elif resp := op.response():
for part in resp.parts:
print(AnyRegistry.unpack(part.data))
print("Deleting products...")
# Delete products with partial success and wait for completion
# The operation will be successful even if some products are not found
op = client.product.delete_products(
requests=[
DeleteProductRequest(id="prod-001"),
DeleteProductRequest(id="prod-002"),
DeleteProductRequest(id="prod-003"),
],
allow_partial_success=True,
).wait()
# Check the result of the operation
if err := op.error():
print(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
print(AnyRegistry.unpack(detail))
if __name__ == "__main__":
main()
Airflow Provider
Link: https://pypi.org/project/airflow-providers-glassdome-waypoint/
"""
Example: batch create, update and delete products.
This example assumes that the Glassdome Waypoint connection has already been created
with the connection ID "glassdome_waypoint_default".
You can create a connection with a different connection ID and use it for the hook.
"""
from datetime import timedelta
from airflow.sdk import dag, task
@task
def manage_products():
import logging
from glassdome_waypoint.hooks.waypoint import WaypointHook
from glassdome_waypoint_sdk import (
AnyRegistry,
)
from glassdome_waypoint_sdk.v1beta1.types import FieldMask, UnitOfMeasure
from glassdome_waypoint_sdk.v1beta1.types.operation import OperationReturnOptions
from glassdome_waypoint_sdk.v1beta1.types.product import (
CreateProductRequest,
DeleteProductRequest,
Product,
Status,
UpdateProductRequest,
)
client = WaypointHook("glassdome_waypoint_default").get_client()
logging.info("Creating products...")
# Create products
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
)
# Wait for completion
op.wait()
# Check the error of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))
logging.info("Creating products again...")
# Create the same products again and wait for completion at the same time
op = client.product.create_products(
requests=[
CreateProductRequest(
product=Product(
id="prod-001",
name="pname001",
sku="sku001",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_EACH,
),
),
CreateProductRequest(
product=Product(
id="prod-002",
name="pname002",
sku="sku002",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_KILOGRAM,
),
),
]
).wait()
# Check the error of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))
logging.info("Updating products...")
# Update products and wait for completion with response
op = client.product.update_products(
requests=[
UpdateProductRequest(
product=Product(
id="prod-001",
name="pname001-updated",
sku="sku001-updated",
status=Status.STATUS_ACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_GRAM,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
UpdateProductRequest(
product=Product(
id="prod-002",
name="pname002-updated",
sku="sku002-updated",
status=Status.STATUS_INACTIVE,
uom=UnitOfMeasure.UNIT_OF_MEASURE_PIECE,
),
update_mask=FieldMask(paths=["name", "sku", "status", "uom"]),
),
]
).wait(return_options=OperationReturnOptions(response=True))
# Check the result of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))
elif resp := op.response():
for part in resp.parts:
logging.info(AnyRegistry.unpack(part.data))
logging.info("Deleting products...")
# Delete products with partial success and wait for completion
# The operation will be successful even if some products are not found
op = client.product.delete_products(
requests=[
DeleteProductRequest(id="prod-001"),
DeleteProductRequest(id="prod-002"),
DeleteProductRequest(id="prod-003"),
],
allow_partial_success=True,
).wait()
# Check the result of the operation
if err := op.error():
logging.error(f"Error code: {err.code}, message: {err.message}")
for detail in err.details:
logging.error(AnyRegistry.unpack(detail))
@dag(
dag_id="example_products",
description="Manage products (create, update and delete)",
schedule=None,
dagrun_timeout=timedelta(minutes=1),
tags=["examples"],
default_args={
"owner": "Glassdome",
"retries": 0,
},
)
def example_products():
manage_products()
dag = example_products()
if __name__ == "__main__":
dag.test()