CI Python Linter

from pathlib import Path from django.core.management.base import BaseCommand from django.conf import settings from datetime import datetime import os import pandas as pd import logging # Use the custom 'shoply' logger for application-specific logging logger = logging.getLogger('shoply') class Command(BaseCommand): help = 'Download and organize dataset from Kaggle' def handle(self, *args, **kwargs): # Step 1: Prompt user for required inputs file_name = input( "What file would you like to process? " "(Example: data/raw/your_dataset.csv): " ).strip() file_path = Path(settings.BASE_DIR) / file_name # Check if file exists if not file_path.exists(): logger.error(f"File not found at {file_path}") return header = input( "What header would you like to sample by? " "(e.g., category): " ).strip() selected_elements = input( "Which elements should be selected from this header " "(leave blank for all)? (e.g., electronics, clothing): " ).strip() sample_size = input( "How many samples should be taken per category? " "(Default: 50): " ).strip() sample_random = input( "Would you like the sample to be random? " "(yes/no, Default: yes): " ).strip().lower() # Set defaults if necessary sample_size = int(sample_size) if sample_size else 50 sample_random = sample_random != "no" # Defaults to True if left blank # Convert selected elements to list elements_list = [ elem.strip() for elem in selected_elements.split(',') ] if selected_elements else [] # Step 2: Load the dataset try: df = pd.read_csv(file_path) logger.info(f"Loaded dataset from {file_path}") except pd.errors.ParserError: logger.error( f"Error parsing the file: {file_path}. " "The file may not be a valid CSV." ) return except Exception as e: logger.error(f"Error loading file: {e}") return # Step 3: Filter by the specified header and elements if header not in df.columns: logger.error( f"Header '{header}' not found in the dataset columns." ) return # If no elements are provided, sample all unique values from the header if not elements_list: logger.info( f"No specific elements selected for '{header}'. " "Using all unique values from this header." ) elements_list = df[header].unique() # Filter the dataset based on the elements provided df_filtered = df[df[header].isin(elements_list)] logger.info( f"Filtered dataset by elements in '{header}': {elements_list}" ) # Step 4: Sample data from each category in the header sampled_data = [] for element in elements_list: # Filter the rows where the header matches the current element group = df_filtered[df_filtered[header] == element] if group.empty: logger.warning( f"No data for element '{element}' in header '{header}'" ) continue # Randomly sample data, or take all if fewer than sample_size if sample_random: group_sampled = group.sample( n=min(sample_size, len(group)), random_state=None ) else: # If no random sampling, take the first `sample_size` rows group_sampled = group.head(sample_size) # Add the sampled group to the list of sampled data sampled_data.append(group_sampled) logger.info( f"Sampled {len(group_sampled)} from '{element}' " f"({len(group)})" ) # Check the length of sampled_data before concatenation if not sampled_data: logger.error( "No samples selected or sampled_data is empty." ) return # Exit the function # Combine all sampled data into a single DataFrame try: final_sampled_data = pd.concat(sampled_data, ignore_index=True) logger.info( f"Total sampled data: {len(final_sampled_data)} " "rows" ) except ValueError as e: logger.error(f"Error while concatenating data: {e}") return # Exit or handle the error # Step 5: Save the processed data to the 'data/processed' directory processed_dir = Path(settings.BASE_DIR) / 'data/processed' processed_dir.mkdir(parents=True, exist_ok=True) # Generating a new file name based on the original timestamp = datetime.now().strftime("%d-%m-%Y_%H-%M-%S") processed_file_path = processed_dir / f"{timestamp}_processed_" processed_file_path += f"{file_path.stem}.csv" try: final_sampled_data.to_csv(processed_file_path, index=False) logger.info( f"Processed data successfully saved to: {processed_file_path}" ) except Exception as e: logger.error(f"Error saving processed data: {e}") return # Step 6: Confirm deletion of the original file delete_original = input( "Would you like to delete the original" "dataset file? (yes/no): " ).strip().lower() if delete_original == "yes": try: os.remove(file_path) logger.info( f"Original dataset file {file_path} deleted successfully." ) except Exception as e: logger.error(f"Error deleting file: {e}") else: logger.info(f"Original dataset file retained: {file_path}")

Settings:


Results:

All clear, no errors found