diff --git a/BAtranslator.py b/BAtranslator.py index d4aa04b..5b44881 100644 --- a/BAtranslator.py +++ b/BAtranslator.py @@ -268,6 +268,41 @@ def extract_strings(output_file, filter_str=None, update_from=None): filter_str (str): Optional filter (e.g., 'is_ascii', 'table_name:TableName') update_from (str): Path to existing JSON file to merge translations from """ + # Auto-setup: Create all required files if they don't exist + setup_required = False + + # Check if repository_map.json exists + if not os.path.exists(REPO_MAP_FILE): + print(f"Repository map not found. Auto-generating from types.cs...") + setup_required = True + + # Check if schema exists + if not os.path.exists('generated_schema.fbs'): + print("FlatBuffer schema not found. Generating from types.cs...") + if not os.path.exists('types.cs'): + print("ERROR: types.cs not found. Cannot auto-generate files.") + print("Please place types.cs in the project directory.") + return + + # Generate schema + setup_schema_from_csharp('types.cs', 'generated_schema.fbs') + + # Preprocess schema to fix reserved keywords + print("Preprocessing schema to fix Python reserved keywords...") + preprocess_flatbuffer_schema('generated_schema.fbs') + + # Generate Python modules + print("Generating Python modules from schema...") + generate_flatbuffer_python('generated_schema.fbs', 'flatc.exe', '.') + print() + + # Generate repository mapping + setup_repository_mapping('types.cs', REPO_MAP_FILE) + print() + + if setup_required: + print("✓ Auto-setup completed! Proceeding with extraction...\n") + # Validate required files if not validate_required_files(REPO_MAP_FILE, DB_FILE): return @@ -346,6 +381,15 @@ def _process_table(cursor, repo_info, filter_type, filter_value, translation_mem schema_class = getattr(schema_module, blob_schema_name) get_root_method = getattr(schema_class, f"GetRootAs{blob_schema_name}") + # Check if table exists in database + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ) + if not cursor.fetchone(): + # Table doesn't exist, skip it silently + return None + # Process table rows cursor.execute(f'SELECT rowid, "{BLOB_COLUMN}" FROM "{table_name}"') table_translations = OrderedDict() @@ -372,6 +416,9 @@ def _process_table(cursor, repo_info, filter_type, filter_value, translation_mem except (ImportError, AttributeError): # Skip tables that can't be processed return None + except sqlite3.OperationalError: + # Handle database errors (e.g., table doesn't exist) silently + return None def _extract_string_fields(data_dict, filter_type, filter_value, translation_memory): """Extract and filter string fields from FlatBuffer data.""" @@ -409,13 +456,13 @@ def _passes_filter(text, filter_type, filter_value): elif filter_type == 'contains_text': return filter_value in text return True + def patch_database(input_file): """Apply translations from JSON file to the database. Args: input_file (str): Path to JSON file containing translations """ - # Validate files if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE): return @@ -438,8 +485,7 @@ def patch_database(input_file): with open(input_file, 'r', encoding='utf-8') as f: translations = json.load(f) - # Process changes - print("Analyzing and applying translations...") + # Analyze translation changes changes_to_apply = _analyze_translation_changes(translations) if not changes_to_apply: @@ -464,36 +510,23 @@ def patch_database(input_file): conn.close() -def patch_database(input_file): - """Apply translations from JSON file to the database.""" - if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE): - return +def _analyze_translation_changes(translations): + """Analyze translation JSON and extract changes to apply. - print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---") - - # Confirm operation - response = input("Are you sure? A backup will be created. (yes/no): ").lower() - if response not in ['yes', 'y']: - print("Operation cancelled.") - return - - # Create backup - print(f"Creating backup '{DB_BACKUP_FILE}'...") - shutil.copyfile(DB_FILE, DB_BACKUP_FILE) - - # Load data - with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f: - repo_map = {v['table_name']: v for v in json.load(f).values()} - - with open(input_file, 'r', encoding='utf-8') as f: - translations = json.load(f) - - # Find changes to apply + Args: + translations (dict): Translation data from JSON file + + Returns: + list: List of changes to apply, each containing table, row_id, and fields + """ changes_to_apply = [] + for table_name, table_data in translations.items(): for row_id_str, fields in table_data.items(): changed_fields = {} + for field, content in fields.items(): + # Check if field has translation that differs from original if (isinstance(content, dict) and 'original' in content and content.get('translation') and content['translation'] != (content['original'] if isinstance(content['original'], str) @@ -511,66 +544,397 @@ def patch_database(input_file): 'fields': changed_fields }) - if not changes_to_apply: - print("No changes found to apply.") + return changes_to_apply + + +def _apply_database_changes(cursor, repo_map, changes_to_apply): + """Apply translation changes to database. + + Args: + cursor: SQLite cursor + repo_map (dict): Repository mapping information + changes_to_apply (list): List of changes to apply + + Returns: + int: Number of successfully updated entries + """ + updated_count = 0 + skipped_tables = set() + + for change in tqdm(changes_to_apply, desc="Applying changes"): + table_name = change['table'] + row_id = change['row_id'] + fields = change['fields'] + + # Skip if table not in repository map + if table_name not in repo_map: + if table_name not in skipped_tables: + print(f"\nWARNING: Table '{table_name}' not found in repository map. Skipping...") + skipped_tables.add(table_name) + continue + + try: + repo_info = repo_map[table_name] + + # Get schema class + module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class']) + if not module_path: + if table_name not in skipped_tables: + print(f"\nWARNING: Schema class '{repo_info['blob_schema_class']}' not found. Skipping table '{table_name}'...") + skipped_tables.add(table_name) + continue + + schema_module = importlib.import_module(module_path) + schema_class = getattr(schema_module, repo_info['blob_schema_class']) + get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}") + + # Check if table exists in database + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ) + if not cursor.fetchone(): + if table_name not in skipped_tables: + print(f"\nWARNING: Table '{table_name}' does not exist in database. Skipping...") + skipped_tables.add(table_name) + continue + + # Get and update data + cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,)) + result = cursor.fetchone() + + if not result or not result[0]: + continue + + # Parse FlatBuffer data + fbs_obj = get_root_method(result[0], 0) + data_dict = flatbuffer_to_dict(fbs_obj) + data_dict.update(fields) + + # Rebuild and save + builder = flatbuffers.Builder(1024) + new_offset = dict_to_flatbuffer(builder, data_dict, schema_class) + builder.Finish(new_offset) + + cursor.execute( + f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?', + (bytes(builder.Output()), row_id) + ) + updated_count += 1 + + except sqlite3.OperationalError as e: + # Handle SQL errors (e.g., table doesn't exist) + if table_name not in skipped_tables: + print(f"\nWARNING: Database error for table '{table_name}': {e}. Skipping...") + skipped_tables.add(table_name) + continue + except Exception as e: + # Handle other errors silently or log them + continue + + return updated_count + + +# ============================================================================ +# SETUP AND UTILITY FUNCTIONS +# ============================================================================ + +def setup_schema_from_csharp(csharp_file='types.cs', output_fbs='generated_schema.fbs'): + """Parse C# files and generate FlatBuffers schema. + + Args: + csharp_file (str): Path to C# file with type definitions + output_fbs (str): Output .fbs schema file path + """ + if not validate_required_files(csharp_file): return - print(f"Found {len(changes_to_apply)} records to update.") + from unidecode import unidecode - # Apply changes - conn = sqlite3.connect(DB_FILE) - cursor = conn.cursor() - updated_count = 0 + print(f"Parsing C# file: {csharp_file}") + print("This may take a while for large files...") - try: - for change in tqdm(changes_to_apply, desc="Applying changes"): - table_name = change['table'] - row_id = change['row_id'] - fields = change['fields'] + # Type mapping + type_map = { + 'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint', + 'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double', + 'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte' + } + + def sanitize(name): + return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name)) + + def to_snake_case(name): + name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name) + name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name) + return name.lower().replace('-', '_') + + # Parse C# file + with open(csharp_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Extract namespace + ns_match = re.search(r'namespace\s+([\w.]+)', content) + namespace = ns_match.group(1) if ns_match else 'FlatData' + + # Parse tables and enums + tables = {} + enums = {} + + # Find all class/table definitions + table_pattern = re.compile(r'public\s+(?:sealed\s+)?class\s+(\w+)\s*{([^}]+)}', re.DOTALL) + for match in table_pattern.finditer(content): + name = match.group(1) + body = match.group(2) + + # Skip non-table classes + if 'BaseExcelRepository' in body or 'BaseDBSchema' in body: + continue + + fields = [] + prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') + for prop_match in prop_pattern.finditer(body): + field_type = prop_match.group(1).replace('?', '') + field_name = to_snake_case(prop_match.group(2)) - if table_name not in repo_map: - continue - - try: - repo_info = repo_map[table_name] - module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class']) - if not module_path: - continue - - schema_module = importlib.import_module(module_path) - schema_class = getattr(schema_module, repo_info['blob_schema_class']) - get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}") - - # Get and update data - cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,)) - result = cursor.fetchone() - if not result or not result[0]: - continue - - fbs_obj = get_root_method(result[0], 0) - data_dict = flatbuffer_to_dict(fbs_obj) - data_dict.update(fields) - - # Rebuild and save - builder = flatbuffers.Builder(1024) - new_offset = dict_to_flatbuffer(builder, data_dict, schema_class) - builder.Finish(new_offset) - - cursor.execute(f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?', - (bytes(builder.Output()), row_id)) - updated_count += 1 - - except Exception: - continue + # Convert type + if field_type in type_map: + fbs_type = type_map[field_type] + elif field_type.startswith('List<'): + inner = field_type[5:-1].replace('?', '') + fbs_type = f"[{type_map.get(inner, sanitize(inner))}]" + else: + fbs_type = sanitize(field_type) + + fields.append((field_name, fbs_type)) - conn.commit() - print(f"\nSuccess! Updated {updated_count} database entries.") + if fields: + tables[name] = fields + + # Find enums + enum_pattern = re.compile(r'public\s+enum\s+(\w+)\s*{([^}]+)}', re.DOTALL) + for match in enum_pattern.finditer(content): + name = match.group(1) + body = match.group(2) + values = [] - except Exception as e: - conn.rollback() - print(f"ERROR during patching: {e}") - finally: - conn.close() + for line in body.split(','): + line = line.strip().split('=')[0].strip() + if line and not line.startswith('//'): + values.append(to_snake_case(line)) + + if values: + enums[name] = values + + # Generate .fbs file + print(f"Generating schema file: {output_fbs}") + with open(output_fbs, 'w', encoding='utf-8') as f: + f.write(f"namespace {namespace};\n\n") + + # Write enums + for enum_name, values in sorted(enums.items()): + f.write(f"enum {enum_name} : int {{\n") + for value in values: + f.write(f" {value},\n") + f.write("}\n\n") + + # Write tables + for table_name, fields in sorted(tables.items()): + f.write(f"table {table_name} {{\n") + for field_name, field_type in fields: + f.write(f" {field_name}:{field_type};\n") + f.write("}\n\n") + + print(f"Success! Generated {len(tables)} tables and {len(enums)} enums.") + + +def setup_repository_mapping(csharp_file='types.cs', output_json='repository_map.json'): + """Create repository mapping from C# file. + + Args: + csharp_file (str): Path to C# file + output_json (str): Output JSON mapping file + """ + if not validate_required_files(csharp_file): + return + + print(f"Analyzing '{csharp_file}' to create repository mapping...") + + # Parse patterns + repo_pattern = re.compile( + r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>' + ) + db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema') + prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') + + repositories = OrderedDict() + db_schemas = OrderedDict() + current_schema = None + + with open(csharp_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip().split('//')[0] + + # Repository definition + repo_match = repo_pattern.search(line) + if repo_match: + repo_name = repo_match.group(1) + table_class = repo_match.group(2).strip() + schema_class = repo_match.group(3).strip() + repositories[repo_name] = { + 'table_class': table_class, + 'schema_class': schema_class + } + continue + + # DB Schema definition + schema_match = db_schema_pattern.search(line) + if schema_match: + current_schema = schema_match.group(1) + db_schemas[current_schema] = {'properties': []} + continue + + # Properties + if current_schema: + prop_match = prop_pattern.search(line) + if prop_match: + prop_type = prop_match.group(1) + prop_name = prop_match.group(2) + db_schemas[current_schema]['properties'].append({ + 'name': prop_name, + 'type': prop_type + }) + + # Match repositories with schemas + mapping = OrderedDict() + for repo_name, repo_info in repositories.items(): + table_class = repo_info['table_class'] + schema_class = repo_info['schema_class'] + + if schema_class in db_schemas: + mapping[repo_name] = { + 'table_name': table_class.replace('Excel', ''), + 'blob_schema_class': schema_class, + 'properties': db_schemas[schema_class]['properties'] + } + + # Save mapping + with open(output_json, 'w', encoding='utf-8') as f: + json.dump(mapping, f, indent=2, ensure_ascii=False) + + print(f"Success! Created mapping with {len(mapping)} repositories.") + print(f"Mapping saved to: {output_json}") + + +def preprocess_flatbuffer_schema(input_fbs, output_fbs=None): + """Preprocess FlatBuffer schema to rename Python reserved keywords. + + Args: + input_fbs (str): Input schema file + output_fbs (str): Output schema file (if None, modifies in place) + """ + if not validate_required_files(input_fbs): + return + + reserved = [ + 'self', 'class', 'def', 'return', 'import', 'from', 'as', + 'if', 'elif', 'else', 'while', 'for', 'in', 'is', 'not', + 'and', 'or', 'True', 'False', 'None', 'pass', 'break', + 'continue', 'try', 'except', 'finally', 'raise', 'with', + 'yield', 'lambda', 'global', 'nonlocal' + ] + + with open(input_fbs, 'r', encoding='utf-8') as f: + content = f.read() + + modified = False + for keyword in reserved: + pattern = rf'\b({keyword})(\s*:\s*\w+)' + if re.search(pattern, content): + content = re.sub(pattern, rf'\1_\2', content) + modified = True + print(f" Renamed '{keyword}' -> '{keyword}_'") + + output_file = output_fbs or input_fbs + with open(output_file, 'w', encoding='utf-8') as f: + f.write(content) + + if modified: + print(f"Preprocessed schema saved to: {output_file}") + else: + print("No reserved keywords found in schema.") + + +def generate_flatbuffer_python(fbs_file, flatc_exe='flatc.exe', output_dir='.'): + """Generate Python modules from FlatBuffer schema. + + Args: + fbs_file (str): FlatBuffer schema file (.fbs) + flatc_exe (str): Path to flatc compiler + output_dir (str): Output directory for generated Python files + """ + if not validate_required_files(fbs_file, flatc_exe): + return + + print(f"Generating Python modules from: {fbs_file}") + + # Run flatc compiler + cmd = [ + flatc_exe, + '--python', + '--gen-object-api', + '-o', output_dir, + fbs_file + ] + + result = os.system(' '.join(cmd)) + + if result == 0: + print("Success! Python modules generated.") + else: + print(f"ERROR: flatc failed with code {result}") + + +def fix_flatbuffer_reserved_names(directory='MX'): + """Fix Python reserved keywords in generated FlatBuffer files. + + Args: + directory (str): Directory containing generated Python files + """ + from pathlib import Path + + if not os.path.exists(directory): + print(f"ERROR: Directory '{directory}' not found") + return + + print(f"Scanning {directory} for reserved keyword issues...") + + reserved_map = {'self': 'self_', 'class': 'class_', 'import': 'import_'} + fixed_count = 0 + + for py_file in Path(directory).rglob('*.py'): + try: + with open(py_file, 'r', encoding='utf-8') as f: + content = f.read() + + original = content + + for reserved, new_name in reserved_map.items(): + # Fix parameter names + pattern = rf'(def __init__\([^)]*\n\s+self,\n(?:[^)]*\n)*?\s+){reserved}(\s*=)' + if re.search(pattern, content): + content = re.sub(pattern, rf'\1{new_name}\2', content) + content = content.replace(f'self.{reserved} = {reserved}', f'self.{new_name} = {new_name}') + print(f" Fixed: {py_file.name}") + + if content != original: + with open(py_file, 'w', encoding='utf-8') as f: + f.write(content) + fixed_count += 1 + except Exception as e: + print(f" ERROR in {py_file}: {e}") + + print(f"\nFixed {fixed_count} file(s).") # ============================================================================ @@ -863,6 +1227,77 @@ def main(): help='CSV file to validate (default: translations.csv)' ) + # Setup schema command - generate FlatBuffer schema from C# + parser_setup_schema = subparsers.add_parser( + 'setup_schema', + help='Parse C# files and generate FlatBuffer schema (.fbs file).' + ) + parser_setup_schema.add_argument( + '--csharp', + type=str, + default='types.cs', + help='Input C# file with type definitions (default: types.cs)' + ) + parser_setup_schema.add_argument( + '--output', + type=str, + default='generated_schema.fbs', + help='Output .fbs schema file (default: generated_schema.fbs)' + ) + + # Setup mapping command - create repository mapping + parser_setup_mapping = subparsers.add_parser( + 'setup_mapping', + help='Create repository mapping from C# files.' + ) + parser_setup_mapping.add_argument( + '--csharp', + type=str, + default='types.cs', + help='Input C# file (default: types.cs)' + ) + parser_setup_mapping.add_argument( + '--output', + type=str, + default='repository_map.json', + help='Output mapping JSON file (default: repository_map.json)' + ) + + # Generate FlatBuffers command - generate Python modules + parser_gen_fb = subparsers.add_parser( + 'generate_flatbuffers', + help='Generate Python modules from FlatBuffer schema with preprocessing.' + ) + parser_gen_fb.add_argument( + '--schema', + type=str, + default='generated_schema.fbs', + help='Input .fbs schema file (default: generated_schema.fbs)' + ) + parser_gen_fb.add_argument( + '--flatc', + type=str, + default='flatc.exe', + help='Path to flatc compiler (default: flatc.exe)' + ) + parser_gen_fb.add_argument( + '--no-preprocess', + action='store_true', + help='Skip preprocessing (fixing reserved keywords)' + ) + + # Fix reserved names command - fix generated Python files + parser_fix_names = subparsers.add_parser( + 'fix_reserved_names', + help='Fix Python reserved keywords in generated FlatBuffer files.' + ) + parser_fix_names.add_argument( + '--directory', + type=str, + default='MX', + help='Directory with generated Python files (default: MX)' + ) + # Parse arguments and execute appropriate command args = parser.parse_args() @@ -877,6 +1312,17 @@ def main(): import_from_csv(args.input, args.output, args.original) elif args.command == 'validate_csv': validate_csv(args.input) + elif args.command == 'setup_schema': + setup_schema_from_csharp(args.csharp, args.output) + elif args.command == 'setup_mapping': + setup_repository_mapping(args.csharp, args.output) + elif args.command == 'generate_flatbuffers': + if not args.no_preprocess: + print("Preprocessing schema to fix reserved keywords...") + preprocess_flatbuffer_schema(args.schema) + generate_flatbuffer_python(args.schema, args.flatc) + elif args.command == 'fix_reserved_names': + fix_flatbuffer_reserved_names(args.directory) else: print(f"ERROR: Unknown command '{args.command}'") parser.print_help() diff --git a/dumpdbschema.py b/dumpdbschema.py deleted file mode 100644 index e1e19ca..0000000 --- a/dumpdbschema.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 -""" -Database Schema Dumper - -This module parses C# files to extract repository and database schema information, -creating a mapping file that connects repositories to their corresponding -database tables and schema classes. Requires decompiled C# files to generate the schema. -""" - -import json -import os -import re -from collections import OrderedDict -from tqdm import tqdm - -# Configuration -CSHARP_FILE = 'types.cs' -OUTPUT_MAP_FILE = 'repository_map.json' - -def parse_csharp_files(): - """Parse C# file to extract repository and database schema information. - - Returns: - tuple: (repositories_dict, db_schemas_dict) containing parsed information - """ - if not os.path.exists(CSHARP_FILE): - print(f"ERROR: File '{CSHARP_FILE}' not found.") - return None, None - - # Regular expressions for parsing - repo_pattern = re.compile( - r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>' - ) - db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema') - prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') - - db_schemas = OrderedDict() - repositories = OrderedDict() - current_db_schema = None - - print(f"Analyzing '{CSHARP_FILE}' to create repository mapping...") - - with open(CSHARP_FILE, 'r', encoding='utf-8') as f: - lines = f.readlines() - - for line in tqdm(lines, desc="Parsing C# file"): - # Remove comments - line = line.strip().split('//')[0] - - # Look for repository definitions - repo_match = repo_pattern.search(line) - if repo_match: - repo_name = repo_match.group(1) - db_schema_class = repo_match.group(2).strip() - blob_schema_class = repo_match.group(3).strip() - repositories[repo_name] = { - 'db_schema_class': db_schema_class, - 'blob_schema_class': blob_schema_class - } - continue - - # Look for database schema definitions - db_schema_match = db_schema_pattern.search(line) - if db_schema_match: - current_db_schema = db_schema_match.group(1) - db_schemas[current_db_schema] = [] - continue - - # If inside a database schema definition, look for properties - if current_db_schema: - prop_match = prop_pattern.search(line) - if prop_match: - prop_type = prop_match.group(1) - prop_name = prop_match.group(2) - db_schemas[current_db_schema].append({ - 'name': prop_name, - 'type': prop_type - }) - - # End of class definition - if '}' in line and '{' not in line: - current_db_schema = None - - print(f"\nFound {len(repositories)} repositories and {len(db_schemas)} database schemas.") - return repositories, db_schemas - -def main(): - """Main function to create the repository mapping file.""" - repos, schemas = parse_csharp_files() - - if not repos: - print("No repositories found. Nothing to save.") - return - - # Combine information into final mapping - final_map = OrderedDict() - - for repo_name, repo_data in repos.items(): - db_schema_name = repo_data['db_schema_class'] - - # Database table name is the database schema class name - table_name = db_schema_name - - # Find key columns for this schema - key_columns = schemas.get(db_schema_name, []) - - final_map[repo_name] = { - 'table_name': table_name, - 'key_columns': key_columns, - 'blob_schema_class': repo_data['blob_schema_class'] - } - - print(f"Saving repository mapping to '{OUTPUT_MAP_FILE}'...") - with open(OUTPUT_MAP_FILE, 'w', encoding='utf-8') as f: - json.dump(final_map, f, indent=2, ensure_ascii=False) - - print("Done! Repository mapping created successfully.") - print(f"You can now use '{OUTPUT_MAP_FILE}' as the source of truth for database operations.") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/generate_flatbuffer_folders.py b/generate_flatbuffer_folders.py deleted file mode 100644 index 986dc82..0000000 --- a/generate_flatbuffer_folders.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python3 -""" -FlatBuffers Python Code Generator - -Generates Python modules from .fbs schema files for BA-translator. -""" - -import os -import sys -import subprocess -import shutil -import argparse -from pathlib import Path - - -def log_info(message, verbose=False): - """Print info message if verbose mode is enabled.""" - if verbose: - print(f"[INFO] {message}") - -def log_error(message): - """Print error message.""" - print(f"[ERROR] {message}") - -def check_flatc(flatc_path='flatc'): - """Check if flatc compiler is available.""" - try: - result = subprocess.run([flatc_path, '--version'], capture_output=True, text=True) - return result.returncode == 0 - except FileNotFoundError: - return False - -def get_namespace(fbs_file): - """Extract namespace from .fbs file.""" - try: - with open(fbs_file, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if line.startswith('namespace '): - return line.replace('namespace ', '').replace(';', '').strip() - except Exception: - pass - return 'FlatData' - -def create_init_file(directory): - """Create __init__.py file in directory.""" - init_file = Path(directory) / '__init__.py' - if not init_file.exists(): - init_file.write_text('# Generated FlatBuffers Python package\n') - -def generate_python_modules(fbs_files, flatc_path='flatc', clean=False, verbose=False): - """Generate Python modules from .fbs files.""" - if not fbs_files: - log_error("No .fbs files provided") - return False - - if not check_flatc(flatc_path): - log_error(f"flatc compiler not found. Please install FlatBuffers.") - return False - - script_dir = Path(__file__).parent.absolute() - - # Clean existing files if requested - if clean: - for dirname in ['FlatData', 'MX']: - dir_path = script_dir / dirname - if dir_path.exists(): - log_info(f"Cleaning {dir_path}", verbose) - shutil.rmtree(dir_path) - - generated_dirs = set() - - for fbs_file in fbs_files: - if not Path(fbs_file).exists(): - log_error(f"Schema file not found: {fbs_file}") - return False - - log_info(f"Processing {fbs_file}", verbose) - - # Generate Python code using flatc - cmd = [ - flatc_path, - '--python', - '--gen-object-api', - '-o', str(script_dir), - fbs_file - ] - - try: - result = subprocess.run(cmd, capture_output=True, text=True) - - if result.returncode != 0: - log_error(f"flatc failed for {fbs_file}") - if result.stderr: - print(f"stderr: {result.stderr}") - return False - - # Get namespace and add to generated dirs - namespace = get_namespace(fbs_file) - generated_dirs.add(script_dir / namespace) - log_info(f"Generated modules for {fbs_file}", verbose) - - except Exception as e: - log_error(f"Exception running flatc: {e}") - return False - - # Create __init__.py files - for gen_dir in generated_dirs: - if gen_dir.exists(): - create_init_file(gen_dir) - log_info(f"Created __init__.py in {gen_dir}", verbose) - - return True - -def find_fbs_files(directory='.'): - """Find all .fbs files in directory.""" - fbs_files = [] - for root, _, files in os.walk(directory): - for file in files: - if file.endswith('.fbs'): - fbs_files.append(os.path.join(root, file)) - return fbs_files - - -def main(): - """Main entry point.""" - parser = argparse.ArgumentParser( - description='Generate Python modules from FlatBuffers schema files') - - parser.add_argument('fbs_files', nargs='*', help='.fbs schema files to process') - parser.add_argument('--auto', action='store_true', help='Auto-find all .fbs files') - parser.add_argument('--flatc-path', default='flatc', help='Path to flatc compiler') - parser.add_argument('--clean', action='store_true', help='Clean existing files first') - parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') - - args = parser.parse_args() - - # Get .fbs files - if args.auto: - fbs_files = find_fbs_files() - if not fbs_files: - log_error("No .fbs files found") - sys.exit(1) - log_info(f"Found {len(fbs_files)} .fbs files", args.verbose) - elif args.fbs_files: - fbs_files = args.fbs_files - else: - parser.print_help() - sys.exit(1) - - # Generate modules - success = generate_python_modules( - fbs_files=fbs_files, - flatc_path=args.flatc_path, - clean=args.clean, - verbose=args.verbose - ) - - if success: - print("[SUCCESS] Python modules generated successfully") - else: - sys.exit(1) - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/parser.py b/parser.py deleted file mode 100644 index 73e0f3b..0000000 --- a/parser.py +++ /dev/null @@ -1,382 +0,0 @@ -#!/usr/bin/env python3 -""" -C# to FlatBuffers Schema Parser - -This module parses C# files containing FlatBuffers object definitions and generates -corresponding .fbs schema files. Requires decompiled C# files to generate the schema. -""" - -import re -import os -from collections import defaultdict -from unidecode import unidecode - -# Configuration -INPUT_CSHARP_FILE = 'types.cs' -OUTPUT_FBS_FILE = 'generated_schema.fbs' -DEFAULT_NAMESPACE = 'FlatData' - -# Type mapping from C# to FlatBuffers -CSHARP_TO_FBS_TYPE_MAP = { - 'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint', - 'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double', - 'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte' -} - -def sanitize_identifier(name): - """Clean identifier names for FlatBuffers compatibility.""" - return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name)) - -def pascal_to_snake_case(name): - """Convert PascalCase to snake_case.""" - name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name) - name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name) - name = name.replace('-', '_') - return name.lower() - -def csharp_to_fbs_type(csharp_type): - """Convert C# type to FlatBuffers type.""" - if csharp_type is None: - return 'int' - - # Remove nullable indicators - csharp_type = csharp_type.replace('?', '') - - # Check direct mappings - if csharp_type in CSHARP_TO_FBS_TYPE_MAP: - return CSHARP_TO_FBS_TYPE_MAP[csharp_type] - - # Handle custom types - return sanitize_identifier(csharp_type) - - -def parse_csharp_file(input_file): - """Parse C# file and extract table and enum definitions. - - Args: - input_file (str): Path to C# input file - - Returns: - dict: Dictionary of parsed definitions with full names as keys - """ - all_definitions = {} - - with open(input_file, 'r', encoding='utf-8') as f: - current_namespace = "_GLOBAL_" - in_block = None - current_name = None - current_fields = [] - current_enum_base_type = 'int' - seen_enum_values = set() - - for line in f: - line = line.strip() - - # Parse namespace declarations - ns_match = re.match(r'namespace (\S+)', line) - if ns_match: - current_namespace = ns_match.group(1).replace(';', '') - continue - - # End of block - if line.startswith('}') and in_block: - full_name = f"{current_namespace}.{current_name}" - if in_block == 'table': - all_definitions[full_name] = { - 'type': 'table', - 'ns': current_namespace, - 'name': current_name, - 'fields': current_fields - } - elif in_block == 'enum': - all_definitions[full_name] = { - 'type': 'enum', - 'ns': current_namespace, - 'name': current_name, - 'base': current_enum_base_type, - 'fields': current_fields - } - - in_block = None - current_fields = [] - continue - - # Parse struct/table definitions - table_match = re.search(r'public struct (\w+) : IFlatbufferObject', line) - enum_match = re.search(r'public enum (\w+)(?:\s*:\s*(\w+))?', line) - - if table_match: - in_block = 'table' - current_name = sanitize_identifier(table_match.group(1)) - continue - elif enum_match: - in_block = 'enum' - current_name = sanitize_identifier(enum_match.group(1)) - csharp_base_type = enum_match.group(2) - current_enum_base_type = csharp_to_fbs_type(csharp_base_type) - seen_enum_values.clear() - continue - - if not in_block: - continue - - # Parse enum fields - if in_block == 'enum': - field_match = re.match(r'(\w+)\s*=\s*(-?\d+)', line) - if field_match: - field_name = sanitize_identifier(field_match.group(1)) - field_value = int(field_match.group(2)) - - if field_value not in seen_enum_values: - seen_enum_values.add(field_value) - current_fields.append(f'{field_name} = {field_value}') - continue - - # Parse table fields - if in_block == 'table': - if not line.startswith('public'): - continue - - # Parse vector methods - vec_match = re.search( - r'public\s+(?:[^\s<]+<(\S+)>|(\S+))\s+(\w+)\s*\(int\s+\w+\)', - line - ) - if vec_match: - csharp_type = vec_match.group(1) if vec_match.group(1) else vec_match.group(2) - fbs_type = csharp_to_fbs_type(csharp_type) - current_fields.append({ - 'original': sanitize_identifier(vec_match.group(3)), - 'type': f'[{fbs_type}]' - }) - continue - - # Parse property definitions - prop_match = re.search( - r'public\s+(?:Nullable<(\S+)>|ArraySegment|(\S+))\s+(\w+)\s*{', - line - ) - if prop_match: - nullable_type, full_type, csharp_name = prop_match.groups() - csharp_type = nullable_type if nullable_type else full_type - - # Skip internal FlatBuffers fields - if csharp_name == 'ByteBuffer' or csharp_name.endswith('Length'): - continue - - # Determine field type - if csharp_type == 'ArraySegment': - field_type = '[ubyte]' - else: - field_type = csharp_to_fbs_type(csharp_type) - - current_fields.append({ - 'original': sanitize_identifier(csharp_name), - 'type': field_type - }) - continue - - # Handle global namespace - if "_GLOBAL_" in {d['ns'] for d in all_definitions.values()}: - for name, data in list(all_definitions.items()): - if data['ns'] == "_GLOBAL_": - new_name = f"{DEFAULT_NAMESPACE}.{data['name']}" - all_definitions[new_name] = data - data['ns'] = DEFAULT_NAMESPACE - del all_definitions[name] - - return all_definitions - - -def find_full_type_name(base_type, current_ns, all_defs): - """Find the full qualified name for a type reference. - - Args: - base_type (str): Base type name to find - current_ns (str): Current namespace context - all_defs (dict): All available type definitions - - Returns: - str or None: Full qualified type name if found - """ - # Try current namespace first - if f"{current_ns}.{base_type}" in all_defs: - return f"{current_ns}.{base_type}" - - # Try default namespace - if f"{DEFAULT_NAMESPACE}.{base_type}" in all_defs: - return f"{DEFAULT_NAMESPACE}.{base_type}" - - # Try global scope - if base_type in all_defs: - return base_type - - # Search in all namespaces - for name in all_defs: - if name.endswith(f".{base_type}"): - return name - - return None - -def generate_fbs_schema(all_definitions, output_file): - """Generate FlatBuffers schema file from parsed definitions. - - Args: - all_definitions (dict): All parsed type definitions - output_file (str): Path to output .fbs file - """ - # Step 1: Filter and resolve dependencies - root_types = {name for name, data in all_definitions.items() if data['type'] == 'table'} - used_types = set() - queue = list(root_types) - - while queue: - type_name = queue.pop(0) - if type_name in used_types or type_name not in all_definitions: - continue - - used_types.add(type_name) - data = all_definitions[type_name] - - if data['type'] == 'table': - for field in data['fields']: - base_type = field['type'].strip('[]') - found_dep = find_full_type_name(base_type, data['ns'], all_definitions) - if found_dep and found_dep not in used_types: - queue.append(found_dep) - - final_definitions = {name: data for name, data in all_definitions.items() if name in used_types} - - # Step 2: Separate tables and enums - tables = {name: data for name, data in final_definitions.items() if data['type'] == 'table'} - enums = {name: data for name, data in final_definitions.items() if data['type'] == 'enum'} - - # Step 3: Topological sort for dependency order - in_degree = {t: 0 for t in tables} - adj = defaultdict(list) - - for name, data in tables.items(): - for field in data['fields']: - base_type = field['type'].strip('[]') - found_dep = find_full_type_name(base_type, data['ns'], tables) - if found_dep: - adj[found_dep].append(name) - in_degree[name] += 1 - - # Topological sort - queue = [t for t in tables if in_degree[t] == 0] - sorted_tables = [] - - while queue: - t = queue.pop(0) - sorted_tables.append(t) - for neighbor in adj.get(t, []): - in_degree[neighbor] -= 1 - if in_degree[neighbor] == 0: - queue.append(neighbor) - - # Handle cycles - cyclic_tables = set(tables.keys()) - set(sorted_tables) - sorted_tables.extend(list(cyclic_tables)) - - # Step 4: Group by namespace - defs_by_ns = defaultdict(lambda: {'enums': [], 'tables': [], 'cycles': []}) - - for name, data in enums.items(): - defs_by_ns[data['ns']]['enums'].append(data) - - for name in sorted_tables: - data = tables[name] - defs_by_ns[data['ns']]['tables'].append(data) - if name in cyclic_tables: - defs_by_ns[data['ns']]['cycles'].append(data['name']) - - # Step 5: Generate FlatBuffers schema file - with open(output_file, 'w', encoding='utf-8') as f: - f.write('// Auto-generated FlatBuffers schema\n') - f.write('// Field order is preserved. Key attributes are properly handled.\n\n') - - for ns, data in sorted(defs_by_ns.items()): - f.write(f'// ----- NAMESPACE: {ns} -----\n') - f.write(f'namespace {ns};\n\n') - - # Forward declarations for circular dependencies - if data['cycles']: - f.write('// Forward declarations for circular dependencies\n') - for table_name in sorted(data['cycles']): - f.write(f'table {table_name};\n') - f.write('\n') - - # Enums - if data['enums']: - f.write('// --- Enums ---\n') - for definition in sorted(data['enums'], key=lambda x: x['name']): - f.write(f'enum {definition["name"]} : {definition["base"]} {{\n') - for field in definition['fields']: - f.write(f' {field},\n') - f.write('}\n\n') - - # Tables - if data['tables']: - f.write('// --- Tables ---\n') - for definition in data['tables']: - f.write(f'table {definition["name"]} {{\n') - - # Handle field naming conflicts - snake_to_original = defaultdict(list) - for field in definition['fields']: - snake_to_original[pascal_to_snake_case(field['original'])].append(field['original']) - - # Track if key attribute was added - key_field_added = False - - for field in definition['fields']: - snake_name = pascal_to_snake_case(field['original']) - field_name = (field['original'] if len(snake_to_original[snake_name]) > 1 - else snake_name) - - is_array = field['type'].startswith('[') - base_type = field['type'].strip('[]') - final_type_str = field['type'] - - # Resolve type references - full_dep_name = find_full_type_name(base_type, definition['ns'], final_definitions) - if full_dep_name: - dep_data = final_definitions[full_dep_name] - simple_name = dep_data['name'] - - if dep_data['ns'] != definition['ns']: - final_type_str = f"{dep_data['ns']}.{simple_name}" - else: - final_type_str = simple_name - - if is_array: - final_type_str = f"[{final_type_str}]" - - # Add key attribute for primary key fields - key_suffix = "" - if (not key_field_added and - field_name.lower() in ['key', 'id'] and - not is_array): - key_suffix = " (key)" - key_field_added = True - - f.write(f' {field_name}:{final_type_str}{key_suffix};\n') - - f.write('}\n\n') - - print(f"Success! Schema with {len(final_definitions)} types saved to {output_file}") - -def main(): - """Main function to run the parser.""" - if not os.path.exists(INPUT_CSHARP_FILE): - print(f"Error: Input file '{INPUT_CSHARP_FILE}' not found.") - return - - print("Starting C# parsing...") - all_definitions = parse_csharp_file(INPUT_CSHARP_FILE) - print(f"Parsed {len(all_definitions)} definitions. Generating .fbs schema...") - generate_fbs_schema(all_definitions, OUTPUT_FBS_FILE) - -if __name__ == '__main__': - main() \ No newline at end of file