Skip to content

Commit

Permalink
fix: checks sync version on sync-sellers
Browse files Browse the repository at this point in the history
  • Loading branch information
elitonzky committed Dec 13, 2024
1 parent affe25d commit 91d74de
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 34 deletions.
102 changes: 81 additions & 21 deletions marketplace/services/vtex/generic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,26 @@ def _send_insert_task(

class ProductInsertionBySellerService(VtexServiceBase): # pragma: no cover
"""
Service for inserting products by seller into UploadProduct model.
This class is used to fetch products from a specific seller and place them in the upload queue
for subsequent processing and insertion into the database.
Important:
----------
It is necessary to have a feed already configured both locally and on the Meta platform.
Service for inserting products by seller into the UploadProduct model.
This service fetches products from a specific seller and places them in the upload queue
for subsequent processing and database insertion.
Note:
-----
- A feed must already be configured both locally and on the Meta platform.
- Supports both v1 and v2 synchronization methods.
Parameters:
------------
- credentials (APICredentials): API credentials for accessing the VTEX platform.
- catalog (Catalog): The catalog associated with the seller's products.
- sellers (List[str]): A list of seller IDs to fetch products for.
Methods:
---------
- insertion_products_by_seller: Fetches products for the specified sellers and processes them
for insertion into the database or further synchronization.
"""

def insertion_products_by_seller(
Expand All @@ -419,34 +431,73 @@ def insertion_products_by_seller(
catalog: Catalog,
sellers: List[str],
):
"""
Fetches and inserts products by seller into the UploadProduct model.
Parameters:
------------
- credentials (APICredentials): API credentials for accessing the VTEX platform.
- catalog (Catalog): The catalog associated with the seller's products.
- sellers (List[str]): A list of seller IDs to fetch products for.
Raises:
-------
- ValueError: If the `sellers` parameter is not provided.
- Exception: If an error occurs during the bulk save process.
Returns:
--------
- List[FacebookProductDTO]: A list of product DTOs if the `use_sync_v2` flag is True.
- None: If no products are returned for v1 synchronization.
"""
if not sellers:
raise ValueError("'sellers' is required")

# Initialize private service
pvt_service = self.get_private_service(
credentials.app_key, credentials.app_token
)

# Determine if synchronization v2 is used
use_sync_v2 = catalog.vtex_app.config.get("use_sync_v2", False)
upload_on_sync = use_sync_v2

# Fetch product data from the VTEX platform
products_dto = pvt_service.list_all_products(
domain=credentials.domain,
catalog=catalog,
sellers=sellers,
upload_on_sync=upload_on_sync,
update_product=True,
sync_specific_sellers=True,
)
print(f"'list_all_products' returned {len(products_dto)}")
if not products_dto:
return None

close_old_connections()
print("starting bulk save process in database")
all_success = self.product_manager.bulk_save_csv_product_data(
products_dto=products_dto,
catalog=catalog,
product_feed=catalog.feeds.first(),
print(
f"Finished synchronizing products for specific sellers: {sellers}. Use sync v2: {use_sync_v2}."
)
if not all_success:
raise Exception(
f"Error on save csv on database. Catalog:{self.catalog.facebook_catalog_id}"

# Handle v1 synchronization (non-batch upload mode)
if not use_sync_v2:
if not products_dto:
return None

# Close old database connections
close_old_connections()
print(f"'list_all_products' returned {len(products_dto)} products.")
print("Starting bulk save process in the database.")

# Save products in bulk
all_success = self.product_manager.bulk_save_csv_product_data(
products_dto=products_dto,
catalog=catalog,
product_feed=catalog.feeds.first(),
)

if not all_success:
raise Exception(
f"Error saving CSV to the database. Catalog: {catalog.facebook_catalog_id}"
)

return products_dto


Expand All @@ -466,7 +517,10 @@ def start_insertion_by_seller(cls, vtex_app: App, sellers: List[str]):
catalog = cls._validate_link_apps(wpp_cloud, vtex_app)

cls._validate_sync_status(vtex_app)
cls._validate_catalog_feed(catalog)
use_sync_v2 = cls._use_sync_v2(vtex_app)
if use_sync_v2 is False:
cls._validate_catalog_feed(catalog)

cls._validate_connected_catalog_flag(vtex_app)

cls._send_task(credentials, catalog, sellers)
Expand Down Expand Up @@ -551,6 +605,12 @@ def _validate_catalog_feed(catalog) -> ProductFeed:

print("validate_catalog_feed - Ok")

@staticmethod
def _use_sync_v2(vtex_app) -> bool:
use_sync_v2 = vtex_app.config.get("use_sync_v2", False)
print(f"App use_sync_v2: {use_sync_v2}")
return use_sync_v2

@staticmethod
def _send_task(credentials, catalog, sellers: Optional[List[str]] = None) -> None:
from marketplace.celery import app as celery_app
Expand Down
2 changes: 2 additions & 0 deletions marketplace/services/vtex/private/products/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def list_all_products(
sellers: Optional[List[str]] = None,
update_product=False,
upload_on_sync=False,
sync_specific_sellers=False,
) -> List[FacebookProductDTO]:
"""
Fetches and processes all products for the given catalog and sellers.
Expand Down Expand Up @@ -123,6 +124,7 @@ def list_all_products(
catalog=catalog,
update_product=update_product,
upload_on_sync=upload_on_sync,
sync_specific_sellers=sync_specific_sellers,
)
return products_dto

Expand Down
39 changes: 26 additions & 13 deletions marketplace/services/vtex/utils/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def process_product_data(
catalog,
update_product=False,
upload_on_sync=False,
sync_specific_sellers=False,
) -> List[FacebookProductDTO]:
"""
Process a batch of SKU IDs with optional active sellers using threads if the batch size is large
Expand All @@ -134,6 +135,7 @@ def process_product_data(
"use_sku_sellers", False
)
self.use_sync_v2 = self.vtex_app.config.get("use_sync_v2", False)
self.sync_specific_sellers = sync_specific_sellers

Check warning on line 138 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L138

Added line #L138 was not covered by tests

print("Initiated process of product treatment.")
self.progress_bar = tqdm(total=len(skus_ids), desc="[✓:0, ✗:0]", ncols=0)
Expand Down Expand Up @@ -182,21 +184,24 @@ def worker(self):
Processes items from the queue.
- For v2 (`use_sync_v2`) with `update_product=True`
(for example, batch uploads), processes items as `seller_id` and `sku_id` pairs.
(e.g., batch uploads without specific sellers), processes items as `seller_id` and `sku_id` pairs.
- For v1 or initial sync (`update_product=False`), processes only `sku_id` with default seller combinations.
- For v1 or initial sync (`update_product=False`), or when `sync_specific_sellers=True`,
processes only `sku_id` with default seller combinations.
The method dynamically determines the appropriate processing logic
based on the `use_sync_v2` flag and the context (`update_product`).
based on the `use_sync_v2`, `update_product`, and `sync_specific_sellers` flags.
"""
while not self.queue.empty():
try:
# Extract item from the queue
item = self.queue.get()

# Determine processing logic based on `use_sync_v2` and `update_product`
if self.use_sync_v2 and self.update_product:
# Parse `seller_id` and `sku_id` pair for v2 batch uploads
# Determine conditions for processing
is_v2_batch_upload = self.use_sync_v2 and self.update_product

Check warning on line 201 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L201

Added line #L201 was not covered by tests

if is_v2_batch_upload and not self.sync_specific_sellers:

Check warning on line 203 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L203

Added line #L203 was not covered by tests
# Parse and process `seller_id` and `sku_id` for v2 batch uploads
seller_id, sku_id = self._parse_seller_sku(item)
processing_result = self.process_seller_sku(
seller_id=seller_id, sku_id=sku_id
Expand All @@ -210,13 +215,7 @@ def worker(self):

except Exception as e:
# Log any processing errors and continue
print(f"Error processing item {item}: {str(e)}")
with self.progress_lock:
self.invalid_products_count += 1
self.progress_bar.set_description(
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)
self._handle_worker_error(item, str(e))

Check warning on line 218 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L218

Added line #L218 was not covered by tests

def _parse_seller_sku(self, seller_sku):
"""
Expand Down Expand Up @@ -252,6 +251,18 @@ def _handle_processing_result(self, processing_result):
)
self.progress_bar.update(1)

def _handle_worker_error(self, item, error_message: str):
"""
Handles errors during worker processing by logging and updating progress.
"""
print(f"Error processing item {item}: {error_message}")
with self.progress_lock:
self.invalid_products_count += 1
self.progress_bar.set_description(

Check warning on line 261 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L258-L261

Added lines #L258 - L261 were not covered by tests
f"[✓:{len(self.results)} | DB:{self.sent_to_db_count} | ✗:{self.invalid_products_count}]"
)
self.progress_bar.update(1)

Check warning on line 264 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L264

Added line #L264 was not covered by tests

def process_single_sku(self, sku_id):
"""
Process a single SKU by validating its details and simulating availability across multiple sellers.
Expand Down Expand Up @@ -402,6 +413,7 @@ def process_sellers_skus_batch(
catalog,
seller_sku_pairs,
upload_on_sync=True,
sync_specific_sellers=False,
):
"""
Process a batch of seller and SKU pairs using threads if the batch size is large.
Expand All @@ -422,6 +434,7 @@ def process_sellers_skus_batch(
self.sku_validator = SKUValidator(service, domain, MockZeroShotClient())
self.sent_to_db_count = 0 # Tracks the number of items sent to the database.
self.update_product = True
self.sync_specific_sellers = sync_specific_sellers

Check warning on line 437 in marketplace/services/vtex/utils/data_processor.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/utils/data_processor.py#L437

Added line #L437 was not covered by tests

# Populate the queue
initial_batch_count = len(seller_sku_pairs)
Expand Down

0 comments on commit 91d74de

Please sign in to comment.