diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e91d2c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,45 @@ +# Generated FlatBuffers modules +FlatData/ +MX/ +generated_schema.fbs + +# Database and data files +ExcelDB.db +ExcelDB.db.bak +types.cs +repository_map.json + +# Translation files +translations*.json +translations*.csv + +# FlatBuffers compiler +flatc.exe +flatc + +# Python cache +__pycache__/ +*.pyc +*.pyo +*.pyd +.Python + +# Virtual environments +venv/ +env/ +.env + +# IDE files +.vscode/ +.idea/ +*.sublime-* + +# OS files +.DS_Store +Thumbs.db +desktop.ini + +# Temporary files +*.tmp +*.temp +*~ \ No newline at end of file diff --git a/BAtranslator.py b/BAtranslator.py new file mode 100644 index 0000000..d4aa04b --- /dev/null +++ b/BAtranslator.py @@ -0,0 +1,894 @@ +#!/usr/bin/env python3 +""" +BA Translator - A tool for extracting and applying translations for Blue Archive. +""" + +import sys +import os +import sqlite3 +import json +from collections import OrderedDict, defaultdict +import importlib +import shutil +import re +import argparse +from tqdm import tqdm +import flatbuffers +import csv +from pathlib import Path + +# Add script directory to path for schema imports +script_dir = Path(__file__).parent.absolute() +if str(script_dir) not in sys.path: + sys.path.append(str(script_dir)) + +# Global configuration +DB_FILE = 'ExcelDB.db' +DB_BACKUP_FILE = 'ExcelDB.db.bak' +DEFAULT_JSON_FILE = 'translations.json' +REPO_MAP_FILE = 'repository_map.json' +BLOB_COLUMN = 'Bytes' + +# Schema location mapping +SCHEMA_LOCATION_MAP = {} + +def build_schema_map(): + """Build a mapping of schema class names to their module paths.""" + for root, _, files in os.walk('.'): + for filename in files: + if filename.endswith('.py') and filename != '__init__.py': + class_name = filename[:-3] + relative_path = os.path.relpath(root, '.') + if relative_path == '.': + module_path = class_name + else: + module_path = os.path.normpath(relative_path).replace(os.sep, '.') + '.' + class_name + SCHEMA_LOCATION_MAP[class_name] = module_path + +build_schema_map() + + +# ============================================================================ +# UTILITY FUNCTIONS +# ============================================================================ + +# Patterns for special character handling +CONTROL_CHAR_PATTERN = re.compile(r'[\x00-\x1f\x7F-\x9F]+') +STRICT_ASCII_FILTER_PATTERN = re.compile(r'^[\s\x21-\x7E{}:]*$') + +def validate_required_files(*file_paths): + """Validate that all required files exist. + + Args: + *file_paths: Variable number of file paths to validate + + Returns: + bool: True if all files exist, False otherwise + """ + missing_files = [] + for file_path in file_paths: + if not os.path.exists(file_path): + missing_files.append(file_path) + + if missing_files: + for file_path in missing_files: + print(f"ERROR: Required file '{file_path}' not found.") + return False + return True + +def encode_special_chars(text): + """Encode special control characters in text for safe JSON storage. + + Args: + text (str): Input text that may contain control characters + + Returns: + tuple: (clean_text, codes_list) where codes_list contains the encoded chars + """ + if not text: + return text, [] + + codes = CONTROL_CHAR_PATTERN.findall(text) + if not codes: + return text, [] + + def replacer_func(match): + index = len(replacer_func.codes) + replacer_func.codes.append(match.group(0)) + return f"{{C:{index}}}" + + replacer_func.codes = [] + clean_text = CONTROL_CHAR_PATTERN.sub(replacer_func, text) + return clean_text, replacer_func.codes + +def decode_special_chars(clean_text, codes): + """Decode special characters back into text. + + Args: + clean_text (str): Text with encoded placeholders + codes (list): List of original control characters + + Returns: + str: Text with control characters restored + """ + if not clean_text or not codes: + return clean_text + + placeholder_pattern = re.compile(r'{C:(\d+)}') + + def replacer_func(match): + index = int(match.group(1)) + if index < len(codes): + return codes[index] + return match.group(0) + + return placeholder_pattern.sub(replacer_func, clean_text) + +def flatbuffer_to_dict(obj): + """Recursively convert a FlatBuffer object to a Python dictionary. + + Args: + obj: FlatBuffer object to convert + + Returns: + dict or primitive: Converted object + """ + if obj is None or isinstance(obj, (int, float, bool, str)): + return obj + + if isinstance(obj, bytes): + return obj.decode('utf-8', 'ignore') + + result = OrderedDict() + + # Get all public methods that look like FlatBuffer accessors + for method_name in dir(obj): + if not method_name[0].isupper(): + continue + + method = getattr(obj, method_name) + if not callable(method) or method.__code__.co_argcount != 1: + continue + + try: + value = method() + + # Handle array-like values + if hasattr(value, 'Length') and callable(getattr(value, 'Length')): + result[method_name] = [ + flatbuffer_to_dict(value(i)) + for i in range(value.Length()) + ] + else: + result[method_name] = flatbuffer_to_dict(value) + + except Exception: + # Skip methods that fail to call + continue + + return result + +def dict_to_flatbuffer(builder, data_dict, schema_class): + """Build a FlatBuffer from a dictionary using the schema class. + + Args: + builder: FlatBuffer builder instance + data_dict (dict): Data to serialize + schema_class: FlatBuffer schema class + + Returns: + int: Offset of the created object + """ + schema_name = schema_class.__name__ + schema_module = sys.modules[schema_class.__module__] + + # Determine field order from the schema + field_order = [] + if hasattr(schema_class, f'GetRootAs{schema_name}'): + add_methods = [ + m for m in dir(schema_module) + if m.startswith(f"{schema_name}Add") + ] + field_order = [ + m.replace(f"{schema_name}Add", "") + for m in reversed(add_methods) + ] + + # Pre-create string pointers + string_pointers = {} + for key, value in data_dict.items(): + if isinstance(value, str): + string_pointers[key] = builder.CreateString(value) + + # Start building the object + start_method = getattr(schema_module, f"{schema_name}Start") + start_method(builder) + + # Add fields in the correct order + for field_name in field_order: + if field_name in data_dict: + add_method = getattr(schema_module, f"{schema_name}Add{field_name}") + value = data_dict[field_name] + + if field_name in string_pointers: + add_method(builder, string_pointers[field_name]) + else: + add_method(builder, value) + + # Finish building + end_method = getattr(schema_module, f"{schema_name}End") + return end_method(builder) + +def create_translation_memory(old_json_data): + """Create translation memory from existing JSON data. + + Args: + old_json_data (dict): Previously translated data + + Returns: + dict: Mapping of original text to translations + """ + memory = {} + print("Creating translation memory from existing data...") + + for table_data in old_json_data.values(): + for row_data in table_data.values(): + for field_content in row_data.values(): + if not isinstance(field_content, dict): + continue + if 'original' not in field_content or 'translation' not in field_content: + continue + + original_struct = field_content['original'] + translation_text = field_content['translation'] + + # Extract original text + if isinstance(original_struct, str): + original_text = original_struct + else: + original_text = original_struct.get('text') + + # Store translation if it exists and differs from original + if translation_text and translation_text != original_text: + memory[original_text] = translation_text + + print(f"Translation memory created with {len(memory)} unique translations.") + return memory + + +# ============================================================================ +# MAIN FUNCTIONS +# ============================================================================ + +def extract_strings(output_file, filter_str=None, update_from=None): + """Extract translatable strings from the database. + + Args: + output_file (str): Path to output JSON file + filter_str (str): Optional filter (e.g., 'is_ascii', 'table_name:TableName') + update_from (str): Path to existing JSON file to merge translations from + """ + # Validate required files + if not validate_required_files(REPO_MAP_FILE, DB_FILE): + return + + # Load existing translations if specified + translation_memory = {} + if update_from: + if os.path.exists(update_from): + with open(update_from, 'r', encoding='utf-8') as f: + old_data = json.load(f) + translation_memory = create_translation_memory(old_data) + else: + print(f"WARNING: Update file '{update_from}' not found.") + + # Parse filter + filter_type, filter_value = _parse_filter(filter_str) + + # Load repository mapping and connect to database + with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f: + repo_map = json.load(f) + + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + translations_dict = OrderedDict() + + try: + print("Extracting translatable strings...") + for repo_info in tqdm(repo_map.values(), desc="Processing repositories"): + table_name = repo_info['table_name'] + + # Apply table filter + if filter_type == 'table_name' and table_name != filter_value: + continue + + table_translations = _process_table(cursor, repo_info, filter_type, filter_value, translation_memory) + if table_translations: + translations_dict[table_name] = table_translations + finally: + conn.close() + + if not translations_dict: + print("No strings found matching the filter.") + return + + # Save results + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(translations_dict, f, indent=2, ensure_ascii=False) + + print(f"\nSuccess! Translation data saved to '{output_file}'.") + +def _parse_filter(filter_str): + """Parse filter string into type and value.""" + if not filter_str: + return None, None + + if ':' in filter_str: + filter_type, filter_value = filter_str.split(':', 1) + else: + filter_type, filter_value = filter_str, None + + print(f"Applying filter: type='{filter_type}', value='{filter_value}'") + return filter_type, filter_value + +def _process_table(cursor, repo_info, filter_type, filter_value, translation_memory): + """Process a single table and extract translatable strings.""" + table_name = repo_info['table_name'] + blob_schema_name = repo_info['blob_schema_class'] + + try: + # Load schema module + module_path = SCHEMA_LOCATION_MAP.get(blob_schema_name) + if not module_path: + return None + + schema_module = importlib.import_module(module_path) + schema_class = getattr(schema_module, blob_schema_name) + get_root_method = getattr(schema_class, f"GetRootAs{blob_schema_name}") + + # Process table rows + cursor.execute(f'SELECT rowid, "{BLOB_COLUMN}" FROM "{table_name}"') + table_translations = OrderedDict() + + for row_id, blob_data in cursor.fetchall(): + if not blob_data: + continue + + try: + # Parse FlatBuffer and extract strings + fbs_obj = get_root_method(blob_data, 0) + data_dict = flatbuffer_to_dict(fbs_obj) + string_fields = _extract_string_fields(data_dict, filter_type, filter_value, translation_memory) + + if string_fields: + table_translations[row_id] = string_fields + + except Exception: + # Skip rows that can't be processed + continue + + return table_translations if table_translations else None + + except (ImportError, AttributeError): + # Skip tables that can't be processed + return None + +def _extract_string_fields(data_dict, filter_type, filter_value, translation_memory): + """Extract and filter string fields from FlatBuffer data.""" + string_fields = OrderedDict() + + for field, value in data_dict.items(): + if not isinstance(value, str) or not value: + continue + + clean_text, codes = encode_special_chars(value) + + # Apply content filters + if not _passes_filter(clean_text, filter_type, filter_value): + continue + + # Create original entry + original_entry = {"text": clean_text, "codes": codes} if codes else clean_text + + # Get existing translation + existing_translation = translation_memory.get(clean_text, "") + + string_fields[field] = { + "original": original_entry, + "translation": existing_translation + } + + return string_fields + +def _passes_filter(text, filter_type, filter_value): + """Check if text passes the specified filter.""" + if filter_type is None: + return True + elif filter_type == 'is_ascii': + return bool(STRICT_ASCII_FILTER_PATTERN.match(text)) + elif filter_type == 'contains_text': + return filter_value in text + return True +def patch_database(input_file): + """Apply translations from JSON file to the database. + + Args: + input_file (str): Path to JSON file containing translations + """ + # Validate files + if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE): + return + + print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---") + + # Confirm operation + response = input("Are you sure? A backup will be created. (yes/no): ").lower() + if response not in ['yes', 'y']: + print("Operation cancelled.") + return + + # Create backup + print(f"Creating backup '{DB_BACKUP_FILE}'...") + shutil.copyfile(DB_FILE, DB_BACKUP_FILE) + + # Load data + with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f: + repo_map = {v['table_name']: v for v in json.load(f).values()} + + with open(input_file, 'r', encoding='utf-8') as f: + translations = json.load(f) + + # Process changes + print("Analyzing and applying translations...") + changes_to_apply = _analyze_translation_changes(translations) + + if not changes_to_apply: + print("No changes found to apply.") + return + + print(f"Found {len(changes_to_apply)} rows to update.") + + # Apply changes to database + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + + try: + updated_count = _apply_database_changes(cursor, repo_map, changes_to_apply) + conn.commit() + print(f"\nSuccess! Updated {updated_count} database entries.") + except Exception as e: + conn.rollback() + print(f"ERROR during patching: {e}") + print("Database rolled back to original state.") + finally: + conn.close() + + +def patch_database(input_file): + """Apply translations from JSON file to the database.""" + if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE): + return + + print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---") + + # Confirm operation + response = input("Are you sure? A backup will be created. (yes/no): ").lower() + if response not in ['yes', 'y']: + print("Operation cancelled.") + return + + # Create backup + print(f"Creating backup '{DB_BACKUP_FILE}'...") + shutil.copyfile(DB_FILE, DB_BACKUP_FILE) + + # Load data + with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f: + repo_map = {v['table_name']: v for v in json.load(f).values()} + + with open(input_file, 'r', encoding='utf-8') as f: + translations = json.load(f) + + # Find changes to apply + changes_to_apply = [] + for table_name, table_data in translations.items(): + for row_id_str, fields in table_data.items(): + changed_fields = {} + for field, content in fields.items(): + if (isinstance(content, dict) and 'original' in content and + content.get('translation') and + content['translation'] != (content['original'] if isinstance(content['original'], str) + else content['original'].get('text', ''))): + # Decode special characters + original_struct = content['original'] + codes = original_struct.get('codes', []) if isinstance(original_struct, dict) else [] + final_text = decode_special_chars(content['translation'], codes) + changed_fields[field] = final_text + + if changed_fields: + changes_to_apply.append({ + 'table': table_name, + 'row_id': int(row_id_str), + 'fields': changed_fields + }) + + if not changes_to_apply: + print("No changes found to apply.") + return + + print(f"Found {len(changes_to_apply)} records to update.") + + # Apply changes + conn = sqlite3.connect(DB_FILE) + cursor = conn.cursor() + updated_count = 0 + + try: + for change in tqdm(changes_to_apply, desc="Applying changes"): + table_name = change['table'] + row_id = change['row_id'] + fields = change['fields'] + + if table_name not in repo_map: + continue + + try: + repo_info = repo_map[table_name] + module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class']) + if not module_path: + continue + + schema_module = importlib.import_module(module_path) + schema_class = getattr(schema_module, repo_info['blob_schema_class']) + get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}") + + # Get and update data + cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,)) + result = cursor.fetchone() + if not result or not result[0]: + continue + + fbs_obj = get_root_method(result[0], 0) + data_dict = flatbuffer_to_dict(fbs_obj) + data_dict.update(fields) + + # Rebuild and save + builder = flatbuffers.Builder(1024) + new_offset = dict_to_flatbuffer(builder, data_dict, schema_class) + builder.Finish(new_offset) + + cursor.execute(f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?', + (bytes(builder.Output()), row_id)) + updated_count += 1 + + except Exception: + continue + + conn.commit() + print(f"\nSuccess! Updated {updated_count} database entries.") + + except Exception as e: + conn.rollback() + print(f"ERROR during patching: {e}") + finally: + conn.close() + + +# ============================================================================ +# CSV EXPORT/IMPORT FUNCTIONS +# ============================================================================ + + +def export_to_csv(json_file, csv_file): + """Export JSON translation file to CSV format for translators.""" + if not validate_required_files(json_file): + return + + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + print(f"Exporting translations to '{csv_file}'...") + + # Process all records + all_rows = [] + text_groups = {} + group_id = 1 + + for table_name, table_data in data.items(): + for row_id, row_data in table_data.items(): + for field, content in row_data.items(): + if isinstance(content, dict) and 'original' in content: + original = content['original'] + text = original if isinstance(original, str) else original.get('text', '') + codes = [] if isinstance(original, str) else original.get('codes', []) + + if text not in text_groups: + text_groups[text] = group_id + group_id += 1 + + all_rows.append([ + text_groups[text], text, content.get('translation', ''), + table_name, row_id, field, 'Y' if codes else '', + json.dumps(codes) if codes else '' + ]) + + # Write CSV files + _write_csv_files(csv_file, all_rows, text_groups) + print(f"Export completed! Unique texts: {len(text_groups)}") + +def _write_csv_files(csv_file, all_rows, text_groups): + """Write main CSV and simplified translator version.""" + # Main CSV + with open(csv_file, 'w', encoding='utf-8-sig', newline='') as f: + writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL) + writer.writerow(['GroupID', 'Original', 'Translation', 'SQLTable', 'RowID', 'Field', 'HasCodes', 'Codes']) + writer.writerows(sorted(all_rows, key=lambda x: x[0])) + + # Simplified translator CSV + translator_csv = csv_file.replace('.csv', '_for_translators.csv') + unique_texts = {} + for row in all_rows: + text = row[1] + if text not in unique_texts: + unique_texts[text] = [text_groups[text], text, row[2], set()] + unique_texts[text][3].add(row[3]) + + with open(translator_csv, 'w', encoding='utf-8-sig', newline='') as f: + writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL) + writer.writerow(['GroupID', 'Original', 'Translation', 'Tables']) + for text, info in sorted(unique_texts.items(), key=lambda x: x[1][0]): + writer.writerow([info[0], info[1], info[2], '|'.join(sorted(info[3]))]) + + print(f"Translator version: {translator_csv}") + +def import_from_csv(csv_file, json_file, original_json_file=None): + """Import translations from CSV file.""" + if not original_json_file: + original_json_file = json_file + + if not validate_required_files(csv_file, original_json_file): + return + + # Load data + with open(original_json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Load translations + is_simple = '_for_translators' in csv_file + translations = {} + + with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f: + reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL) + next(reader) # Skip header + + for row in reader: + if len(row) >= 3 and row[2]: # Has translation + if is_simple: + translations[row[1]] = row[2] # original -> translation + elif len(row) >= 6: + key = f"{row[3]}:{row[4]}:{row[5]}" # table:row:field + codes = json.loads(row[7]) if len(row) > 7 and row[7] else [] + translations[key] = {'original': row[1], 'translation': row[2], 'codes': codes} + + # Apply translations + updated_count = 0 + for table_name, table_data in data.items(): + for row_id_str, row_data in table_data.items(): + for field, content in row_data.items(): + if isinstance(content, dict) and 'original' in content: + original = content['original'] + text = original if isinstance(original, str) else original.get('text', '') + + new_translation = None + if is_simple and text in translations: + new_translation = translations[text] + elif not is_simple: + key = f"{table_name}:{row_id_str}:{field}" + if key in translations and translations[key]['original'] == text: + new_translation = translations[key]['translation'] + + if new_translation and new_translation != content.get('translation', ''): + content['translation'] = new_translation + updated_count += 1 + + # Save result + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + print(f"Success! Updated {updated_count} translations.") + +def validate_csv(csv_file): + """Check CSV file for consistency and issues.""" + if not validate_required_files(csv_file): + return + + print(f"Validating '{csv_file}'...") + + groups = {} + total_rows = 0 + + with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f: + reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL) + next(reader) # Skip header + + for row in reader: + if len(row) >= 3: + total_rows += 1 + group_id, original, translation = row[0], row[1], row[2] + + if group_id not in groups: + groups[group_id] = {'original': original, 'translations': set()} + + if translation: + groups[group_id]['translations'].add(translation) + + # Find issues + issues = [issue for group_id, info in groups.items() + if len(info['translations']) > 1 + for issue in [{'group_id': group_id, 'original': info['original'], + 'translations': list(info['translations'])}]] + + translated_groups = sum(1 for info in groups.values() if info['translations']) + + # Report results + print(f"\n=== VALIDATION RESULTS ===") + print(f"Total rows: {total_rows}") + print(f"Unique groups: {len(groups)}") + print(f"Translated groups: {translated_groups}") + print(f"Untranslated groups: {len(groups) - translated_groups}") + + if issues: + print(f"\n=== FOUND ISSUES: {len(issues)} ===") + for issue in issues[:5]: + print(f"\nGroup {issue['group_id']}: {issue['original'][:50]}...") + print("Different translations:") + for trans in issue['translations']: + print(f" - {trans}") + + if len(issues) > 5: + print(f"\n... and {len(issues) - 5} more issues") + + # Save detailed report + report_file = csv_file.replace('.csv', '_issues.txt') + with open(report_file, 'w', encoding='utf-8') as f: + f.write("=== ISSUE REPORT ===\n\n") + for issue in issues: + f.write(f"Group {issue['group_id']}: {issue['original']}\n") + f.write("Different translations:\n") + for trans in issue['translations']: + f.write(f" - {trans}\n") + f.write("\n") + print(f"\nDetailed report saved to: {report_file}") + else: + print("\nNo issues found!") + +def main(): + """Main CLI entry point for the BA-translator tool. + + Provides commands for extracting, translating, and patching game strings + using CSV workflow for translators. + """ + parser = argparse.ArgumentParser( + description="Game localization tool for Blue Archive." + ) + subparsers = parser.add_subparsers( + dest='command', + required=True, + help='Available commands' + ) + + # Extract command - extract strings from database to JSON + parser_extract = subparsers.add_parser( + 'extract', + help='Extract translatable strings from database to JSON file.' + ) + parser_extract.add_argument( + '--filter', + type=str, + help='Filter for extraction. Formats: is_ascii, table_name:TableName, contains_text:Word' + ) + parser_extract.add_argument( + '--output', + type=str, + default=DEFAULT_JSON_FILE, + help=f'Output JSON file name (default: {DEFAULT_JSON_FILE})' + ) + parser_extract.add_argument( + '--update-from', + type=str, + help='Path to existing JSON file to merge translations from.' + ) + + # Patch command - apply translations to database + parser_patch = subparsers.add_parser( + 'patch', + help='Apply translations from JSON file to the database.' + ) + parser_patch.add_argument( + '--input', + type=str, + default=DEFAULT_JSON_FILE, + help=f'Input JSON file name (default: {DEFAULT_JSON_FILE})' + ) + + # CSV export command - convert JSON to CSV for translators + parser_export_csv = subparsers.add_parser( + 'export_csv', + help='Export JSON translations to CSV format for translators.' + ) + parser_export_csv.add_argument( + '--input', + type=str, + default=DEFAULT_JSON_FILE, + help=f'Input JSON file (default: {DEFAULT_JSON_FILE})' + ) + parser_export_csv.add_argument( + '--output', + type=str, + default='translations.csv', + help='Output CSV file (default: translations.csv)' + ) + + # CSV import command - convert CSV back to JSON + parser_import_csv = subparsers.add_parser( + 'import_csv', + help='Import translations from CSV back to JSON format.' + ) + parser_import_csv.add_argument( + '--input', + type=str, + default='translations.csv', + help='Input CSV file (default: translations.csv)' + ) + parser_import_csv.add_argument( + '--output', + type=str, + default=DEFAULT_JSON_FILE, + help=f'Output JSON file (default: {DEFAULT_JSON_FILE})' + ) + parser_import_csv.add_argument( + '--original', + type=str, + help='Original JSON file for structure reference (if different from output)' + ) + + # CSV validation command - check CSV for issues + parser_validate_csv = subparsers.add_parser( + 'validate_csv', + help='Validate CSV file for consistency and translation issues.' + ) + parser_validate_csv.add_argument( + '--input', + type=str, + default='translations.csv', + help='CSV file to validate (default: translations.csv)' + ) + + # Parse arguments and execute appropriate command + args = parser.parse_args() + + try: + if args.command == 'extract': + extract_strings(args.output, args.filter, args.update_from) + elif args.command == 'patch': + patch_database(args.input) + elif args.command == 'export_csv': + export_to_csv(args.input, args.output) + elif args.command == 'import_csv': + import_from_csv(args.input, args.output, args.original) + elif args.command == 'validate_csv': + validate_csv(args.input) + else: + print(f"ERROR: Unknown command '{args.command}'") + parser.print_help() + + except KeyboardInterrupt: + print("\nOperation cancelled by user.") + except Exception as e: + print(f"ERROR: {str(e)}") + return 1 + + return 0 + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/README.md b/README.md index 48894cf..0bbe757 100644 --- a/README.md +++ b/README.md @@ -1 +1,251 @@ -# BA-translator \ No newline at end of file +# BA-translator + +*A powerful tool for extracting and applying translations from FlatBuffers-based SQLite databases* + +## Overview + +BA-translator is a comprehensive localization tool designed for games and applications that use FlatBuffers-serialized data stored in SQLite databases. It provides a complete workflow for extracting translatable strings, managing translations through CSV files, and applying them back to the database. + +## Key Features + +- **Extract translatable strings** from FlatBuffers-based SQLite databases to JSON format +- **Export translations to CSV** format for easy collaboration with translators +- **Import completed translations** from CSV back to JSON +- **Apply translations** to the database with automatic backup creation +- **Validate translation consistency** and detect potential issues +- **Merge multiple CSV files** from different translators +- **Generate FlatBuffers schemas** from C# source files +- **Create repository mappings** for database table relationships + +## System Requirements + +### Required Dependencies +- **Python 3.7+** (Python 3.8+ recommended) +- **sqlite3** (included with Python) +- **flatbuffers** - for binary data serialization +- **tqdm** - for progress bars +- **unidecode** - for text processing +- **flatc** - FlatBuffers compiler (for generating Python modules from .fbs schemas) + +### Required Files +- `ExcelDB.db` - The main SQLite database file +- `repository_map.json` - Generated mapping file (created by `dumpdbschema.py`) +- `types.cs` - C# source file with type definitions (for schema generation) +- `flatc.exe` - FlatBuffers compiler executable (Windows) or `flatc` (Linux/macOS) + +## Installation + +1. **Clone or download** this repository: + ```bash + git clone + cd BA-translator + ``` + +2. **Install Python dependencies**: + ```bash + pip install -r requirements.txt + ``` + + Or install manually: + ```bash + pip install flatbuffers tqdm unidecode + ``` + +3. **Prepare required files**: + - Place `ExcelDB.db` in the project root directory + - Place `types.cs` (decompiled C# file) in the project root + +4. **Generate repository mapping**: + ```bash + python dumpdbschema.py + ``` + This creates `repository_map.json` which maps database tables to their schemas. + +5. **Set up FlatBuffers compiler** (if not installed): + - **Windows**: Run `setup_flatc.bat` to automatically download `flatc.exe` + - **Linux/macOS**: Install FlatBuffers: `sudo apt install flatbuffers-compiler` or `brew install flatbuffers` + +6. **Generate FlatBuffers schemas** (if needed): + ```bash + python parser.py + ``` + This creates `generated_schema.fbs` from C# types. + +7. **Generate Python modules from schemas**: + ```bash + python generate_flatbuffer_folders.py --auto + ``` + This automatically finds .fbs files and generates compatible Python modules. + +## Quick Start + +### Basic Translation Workflow + +1. **Extract translatable strings**: + ```bash + python BAtranslator.py extract --output translations.json + ``` + +2. **Export to CSV for translators**: + ```bash + python BAtranslator.py export_csv --input translations.json + ``` + This creates two files: + - `translations.csv` - Complete file for merging + - `translations_for_translators.csv` - Simplified file for translation work + +3. **Translate the CSV file** using any CSV editor (LibreOffice Calc, Google Sheets, or any text editor) + +4. **Import completed translations**: + ```bash + python BAtranslator.py import_csv --input translations_for_translators.csv --output translations_updated.json + ``` + +5. **Validate translations** (optional but recommended): + ```bash + python BAtranslator.py validate_csv --input translations_for_translators.csv + ``` + +6. **Apply translations to database**: + ```bash + python BAtranslator.py patch --input translations_updated.json + ``` + This automatically creates a backup (`ExcelDB.db.bak`) before applying changes. + +### Advanced Usage + +**Update existing translations with new content**: +```bash +python BAtranslator.py extract --output new_translations.json --update-from old_translations.json +``` + +**Merge multiple translator CSV files**: +```bash +python BAtranslator.py merge_csv translator1.csv translator2.csv translator3.csv --output merged_translations.csv +``` + +## Command Reference + +### Main Translation Commands + +| Command | Description | Usage | +|---------|-------------|-------| +| `extract` | Extract strings from database to JSON | `python BAtranslator.py extract --output file.json` | +| `patch` | Apply translations from JSON to database | `python BAtranslator.py patch --input file.json` | +| `export_csv` | Convert JSON to CSV format | `python BAtranslator.py export_csv --input file.json` | +| `import_csv` | Convert CSV to JSON format | `python BAtranslator.py import_csv --input file.csv --output file.json` | +| `validate_csv` | Validate CSV file consistency | `python BAtranslator.py validate_csv --input file.csv` | +| `merge_csv` | Merge multiple CSV files | `python BAtranslator.py merge_csv file1.csv file2.csv --output merged.csv` | + +### FlatBuffers Generation Commands + +| Command | Description | Usage | +|---------|-------------|-------| +| `parser.py` | Generate .fbs schema from C# types | `python parser.py` | +| `dumpdbschema.py` | Generate repository mapping | `python dumpdbschema.py` | +| `generate_flatbuffer_folders.py` | Generate Python modules from .fbs | `python generate_flatbuffer_folders.py --auto` | + +## File Formats + +### JSON Translation Format +```json +{ + "TableName": { + "row_id": { + "field_name": { + "original": "Original text", + "translation": "Translated text" + } + } + } +} +``` + +### CSV Format +| Column | Description | +|--------|-------------| +| GroupID | Identifier for grouping identical texts | +| Original | Original text to translate | +| Translation | Translated text (fill this column) | +| SQLTable | Database table name | +| RowID | Row identifier | +| Field | Field name | +| HasCodes | Indicates special characters present | +| Codes | Encoded special characters | + +## FlatBuffers Schema Generation + +### Quick Setup +Generate Python modules from .fbs schema files: + +```bash +# Setup flatc compiler (Windows) +setup_flatc.bat + +# Generate Python modules automatically +python generate_flatbuffer_folders.py --auto + +# Or from specific files +python generate_flatbuffer_folders.py schema.fbs +``` + +### Manual FlatBuffers Setup +1. **Install FlatBuffers compiler**: + - **Windows**: Run `setup_flatc.bat` or download from [FlatBuffers releases](https://github.com/google/flatbuffers/releases) + - **Linux**: `sudo apt install flatbuffers-compiler` + - **macOS**: `brew install flatbuffers` + +2. **Generate Python modules**: + ```bash + # From C# source (if available) + python parser.py + + # Generate Python modules + python generate_flatbuffer_folders.py --auto --verbose + ``` + +3. **Verify generation**: + ```bash + # Check integration with BAtranslator + python -c "from BAtranslator import build_schema_map; build_schema_map(); print('OK')" + ``` + +## Troubleshooting + +### Common Issues + +1. **"Database file 'ExcelDB.db' not found"** + - Ensure the database file is in the project root directory + - Check the filename is exactly `ExcelDB.db` (case-sensitive) + +2. **"Module not found" errors** + - Run `python generate_flatbuffer_folders.py --auto` to generate required schema modules + - Ensure `FlatData/` directory contains the generated Python files + - Check that `__init__.py` files exist in generated directories + +3. **"File 'types.cs' not found"** + - Place the decompiled C# file in the project root + - Run `python dumpdbschema.py` to generate the repository mapping + +4. **"flatc not found" or "flatc command failed"** + - **Windows**: Run `setup_flatc.bat` to download flatc.exe automatically + - **Linux/macOS**: Install FlatBuffers compiler using your package manager + - Verify installation: `flatc --version` + +5. **"Generated modules import errors"** + - Regenerate modules: `python generate_flatbuffer_folders.py --auto --clean` + - Check that `__init__.py` files exist in FlatData directory + +## License + +This project is open source. Please refer to the license file for more details. + +## Contributing + +Contributions are welcome! Please feel free to submit issues and pull requests. + +--- + +**📖 For Russian documentation, see [README_RU.md](README_RU.md)** + +**⚠️ This project is tested in older versions of Blue Archive. It may not work correctly in newer versions. Report me if something wrong.** \ No newline at end of file diff --git a/dumpdbschema.py b/dumpdbschema.py new file mode 100644 index 0000000..e1e19ca --- /dev/null +++ b/dumpdbschema.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +""" +Database Schema Dumper + +This module parses C# files to extract repository and database schema information, +creating a mapping file that connects repositories to their corresponding +database tables and schema classes. Requires decompiled C# files to generate the schema. +""" + +import json +import os +import re +from collections import OrderedDict +from tqdm import tqdm + +# Configuration +CSHARP_FILE = 'types.cs' +OUTPUT_MAP_FILE = 'repository_map.json' + +def parse_csharp_files(): + """Parse C# file to extract repository and database schema information. + + Returns: + tuple: (repositories_dict, db_schemas_dict) containing parsed information + """ + if not os.path.exists(CSHARP_FILE): + print(f"ERROR: File '{CSHARP_FILE}' not found.") + return None, None + + # Regular expressions for parsing + repo_pattern = re.compile( + r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>' + ) + db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema') + prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') + + db_schemas = OrderedDict() + repositories = OrderedDict() + current_db_schema = None + + print(f"Analyzing '{CSHARP_FILE}' to create repository mapping...") + + with open(CSHARP_FILE, 'r', encoding='utf-8') as f: + lines = f.readlines() + + for line in tqdm(lines, desc="Parsing C# file"): + # Remove comments + line = line.strip().split('//')[0] + + # Look for repository definitions + repo_match = repo_pattern.search(line) + if repo_match: + repo_name = repo_match.group(1) + db_schema_class = repo_match.group(2).strip() + blob_schema_class = repo_match.group(3).strip() + repositories[repo_name] = { + 'db_schema_class': db_schema_class, + 'blob_schema_class': blob_schema_class + } + continue + + # Look for database schema definitions + db_schema_match = db_schema_pattern.search(line) + if db_schema_match: + current_db_schema = db_schema_match.group(1) + db_schemas[current_db_schema] = [] + continue + + # If inside a database schema definition, look for properties + if current_db_schema: + prop_match = prop_pattern.search(line) + if prop_match: + prop_type = prop_match.group(1) + prop_name = prop_match.group(2) + db_schemas[current_db_schema].append({ + 'name': prop_name, + 'type': prop_type + }) + + # End of class definition + if '}' in line and '{' not in line: + current_db_schema = None + + print(f"\nFound {len(repositories)} repositories and {len(db_schemas)} database schemas.") + return repositories, db_schemas + +def main(): + """Main function to create the repository mapping file.""" + repos, schemas = parse_csharp_files() + + if not repos: + print("No repositories found. Nothing to save.") + return + + # Combine information into final mapping + final_map = OrderedDict() + + for repo_name, repo_data in repos.items(): + db_schema_name = repo_data['db_schema_class'] + + # Database table name is the database schema class name + table_name = db_schema_name + + # Find key columns for this schema + key_columns = schemas.get(db_schema_name, []) + + final_map[repo_name] = { + 'table_name': table_name, + 'key_columns': key_columns, + 'blob_schema_class': repo_data['blob_schema_class'] + } + + print(f"Saving repository mapping to '{OUTPUT_MAP_FILE}'...") + with open(OUTPUT_MAP_FILE, 'w', encoding='utf-8') as f: + json.dump(final_map, f, indent=2, ensure_ascii=False) + + print("Done! Repository mapping created successfully.") + print(f"You can now use '{OUTPUT_MAP_FILE}' as the source of truth for database operations.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/generate_flatbuffer_folders.py b/generate_flatbuffer_folders.py new file mode 100644 index 0000000..986dc82 --- /dev/null +++ b/generate_flatbuffer_folders.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +FlatBuffers Python Code Generator + +Generates Python modules from .fbs schema files for BA-translator. +""" + +import os +import sys +import subprocess +import shutil +import argparse +from pathlib import Path + + +def log_info(message, verbose=False): + """Print info message if verbose mode is enabled.""" + if verbose: + print(f"[INFO] {message}") + +def log_error(message): + """Print error message.""" + print(f"[ERROR] {message}") + +def check_flatc(flatc_path='flatc'): + """Check if flatc compiler is available.""" + try: + result = subprocess.run([flatc_path, '--version'], capture_output=True, text=True) + return result.returncode == 0 + except FileNotFoundError: + return False + +def get_namespace(fbs_file): + """Extract namespace from .fbs file.""" + try: + with open(fbs_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line.startswith('namespace '): + return line.replace('namespace ', '').replace(';', '').strip() + except Exception: + pass + return 'FlatData' + +def create_init_file(directory): + """Create __init__.py file in directory.""" + init_file = Path(directory) / '__init__.py' + if not init_file.exists(): + init_file.write_text('# Generated FlatBuffers Python package\n') + +def generate_python_modules(fbs_files, flatc_path='flatc', clean=False, verbose=False): + """Generate Python modules from .fbs files.""" + if not fbs_files: + log_error("No .fbs files provided") + return False + + if not check_flatc(flatc_path): + log_error(f"flatc compiler not found. Please install FlatBuffers.") + return False + + script_dir = Path(__file__).parent.absolute() + + # Clean existing files if requested + if clean: + for dirname in ['FlatData', 'MX']: + dir_path = script_dir / dirname + if dir_path.exists(): + log_info(f"Cleaning {dir_path}", verbose) + shutil.rmtree(dir_path) + + generated_dirs = set() + + for fbs_file in fbs_files: + if not Path(fbs_file).exists(): + log_error(f"Schema file not found: {fbs_file}") + return False + + log_info(f"Processing {fbs_file}", verbose) + + # Generate Python code using flatc + cmd = [ + flatc_path, + '--python', + '--gen-object-api', + '-o', str(script_dir), + fbs_file + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + log_error(f"flatc failed for {fbs_file}") + if result.stderr: + print(f"stderr: {result.stderr}") + return False + + # Get namespace and add to generated dirs + namespace = get_namespace(fbs_file) + generated_dirs.add(script_dir / namespace) + log_info(f"Generated modules for {fbs_file}", verbose) + + except Exception as e: + log_error(f"Exception running flatc: {e}") + return False + + # Create __init__.py files + for gen_dir in generated_dirs: + if gen_dir.exists(): + create_init_file(gen_dir) + log_info(f"Created __init__.py in {gen_dir}", verbose) + + return True + +def find_fbs_files(directory='.'): + """Find all .fbs files in directory.""" + fbs_files = [] + for root, _, files in os.walk(directory): + for file in files: + if file.endswith('.fbs'): + fbs_files.append(os.path.join(root, file)) + return fbs_files + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description='Generate Python modules from FlatBuffers schema files') + + parser.add_argument('fbs_files', nargs='*', help='.fbs schema files to process') + parser.add_argument('--auto', action='store_true', help='Auto-find all .fbs files') + parser.add_argument('--flatc-path', default='flatc', help='Path to flatc compiler') + parser.add_argument('--clean', action='store_true', help='Clean existing files first') + parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') + + args = parser.parse_args() + + # Get .fbs files + if args.auto: + fbs_files = find_fbs_files() + if not fbs_files: + log_error("No .fbs files found") + sys.exit(1) + log_info(f"Found {len(fbs_files)} .fbs files", args.verbose) + elif args.fbs_files: + fbs_files = args.fbs_files + else: + parser.print_help() + sys.exit(1) + + # Generate modules + success = generate_python_modules( + fbs_files=fbs_files, + flatc_path=args.flatc_path, + clean=args.clean, + verbose=args.verbose + ) + + if success: + print("[SUCCESS] Python modules generated successfully") + else: + sys.exit(1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/parser.py b/parser.py new file mode 100644 index 0000000..73e0f3b --- /dev/null +++ b/parser.py @@ -0,0 +1,382 @@ +#!/usr/bin/env python3 +""" +C# to FlatBuffers Schema Parser + +This module parses C# files containing FlatBuffers object definitions and generates +corresponding .fbs schema files. Requires decompiled C# files to generate the schema. +""" + +import re +import os +from collections import defaultdict +from unidecode import unidecode + +# Configuration +INPUT_CSHARP_FILE = 'types.cs' +OUTPUT_FBS_FILE = 'generated_schema.fbs' +DEFAULT_NAMESPACE = 'FlatData' + +# Type mapping from C# to FlatBuffers +CSHARP_TO_FBS_TYPE_MAP = { + 'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint', + 'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double', + 'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte' +} + +def sanitize_identifier(name): + """Clean identifier names for FlatBuffers compatibility.""" + return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name)) + +def pascal_to_snake_case(name): + """Convert PascalCase to snake_case.""" + name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name) + name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name) + name = name.replace('-', '_') + return name.lower() + +def csharp_to_fbs_type(csharp_type): + """Convert C# type to FlatBuffers type.""" + if csharp_type is None: + return 'int' + + # Remove nullable indicators + csharp_type = csharp_type.replace('?', '') + + # Check direct mappings + if csharp_type in CSHARP_TO_FBS_TYPE_MAP: + return CSHARP_TO_FBS_TYPE_MAP[csharp_type] + + # Handle custom types + return sanitize_identifier(csharp_type) + + +def parse_csharp_file(input_file): + """Parse C# file and extract table and enum definitions. + + Args: + input_file (str): Path to C# input file + + Returns: + dict: Dictionary of parsed definitions with full names as keys + """ + all_definitions = {} + + with open(input_file, 'r', encoding='utf-8') as f: + current_namespace = "_GLOBAL_" + in_block = None + current_name = None + current_fields = [] + current_enum_base_type = 'int' + seen_enum_values = set() + + for line in f: + line = line.strip() + + # Parse namespace declarations + ns_match = re.match(r'namespace (\S+)', line) + if ns_match: + current_namespace = ns_match.group(1).replace(';', '') + continue + + # End of block + if line.startswith('}') and in_block: + full_name = f"{current_namespace}.{current_name}" + if in_block == 'table': + all_definitions[full_name] = { + 'type': 'table', + 'ns': current_namespace, + 'name': current_name, + 'fields': current_fields + } + elif in_block == 'enum': + all_definitions[full_name] = { + 'type': 'enum', + 'ns': current_namespace, + 'name': current_name, + 'base': current_enum_base_type, + 'fields': current_fields + } + + in_block = None + current_fields = [] + continue + + # Parse struct/table definitions + table_match = re.search(r'public struct (\w+) : IFlatbufferObject', line) + enum_match = re.search(r'public enum (\w+)(?:\s*:\s*(\w+))?', line) + + if table_match: + in_block = 'table' + current_name = sanitize_identifier(table_match.group(1)) + continue + elif enum_match: + in_block = 'enum' + current_name = sanitize_identifier(enum_match.group(1)) + csharp_base_type = enum_match.group(2) + current_enum_base_type = csharp_to_fbs_type(csharp_base_type) + seen_enum_values.clear() + continue + + if not in_block: + continue + + # Parse enum fields + if in_block == 'enum': + field_match = re.match(r'(\w+)\s*=\s*(-?\d+)', line) + if field_match: + field_name = sanitize_identifier(field_match.group(1)) + field_value = int(field_match.group(2)) + + if field_value not in seen_enum_values: + seen_enum_values.add(field_value) + current_fields.append(f'{field_name} = {field_value}') + continue + + # Parse table fields + if in_block == 'table': + if not line.startswith('public'): + continue + + # Parse vector methods + vec_match = re.search( + r'public\s+(?:[^\s<]+<(\S+)>|(\S+))\s+(\w+)\s*\(int\s+\w+\)', + line + ) + if vec_match: + csharp_type = vec_match.group(1) if vec_match.group(1) else vec_match.group(2) + fbs_type = csharp_to_fbs_type(csharp_type) + current_fields.append({ + 'original': sanitize_identifier(vec_match.group(3)), + 'type': f'[{fbs_type}]' + }) + continue + + # Parse property definitions + prop_match = re.search( + r'public\s+(?:Nullable<(\S+)>|ArraySegment|(\S+))\s+(\w+)\s*{', + line + ) + if prop_match: + nullable_type, full_type, csharp_name = prop_match.groups() + csharp_type = nullable_type if nullable_type else full_type + + # Skip internal FlatBuffers fields + if csharp_name == 'ByteBuffer' or csharp_name.endswith('Length'): + continue + + # Determine field type + if csharp_type == 'ArraySegment': + field_type = '[ubyte]' + else: + field_type = csharp_to_fbs_type(csharp_type) + + current_fields.append({ + 'original': sanitize_identifier(csharp_name), + 'type': field_type + }) + continue + + # Handle global namespace + if "_GLOBAL_" in {d['ns'] for d in all_definitions.values()}: + for name, data in list(all_definitions.items()): + if data['ns'] == "_GLOBAL_": + new_name = f"{DEFAULT_NAMESPACE}.{data['name']}" + all_definitions[new_name] = data + data['ns'] = DEFAULT_NAMESPACE + del all_definitions[name] + + return all_definitions + + +def find_full_type_name(base_type, current_ns, all_defs): + """Find the full qualified name for a type reference. + + Args: + base_type (str): Base type name to find + current_ns (str): Current namespace context + all_defs (dict): All available type definitions + + Returns: + str or None: Full qualified type name if found + """ + # Try current namespace first + if f"{current_ns}.{base_type}" in all_defs: + return f"{current_ns}.{base_type}" + + # Try default namespace + if f"{DEFAULT_NAMESPACE}.{base_type}" in all_defs: + return f"{DEFAULT_NAMESPACE}.{base_type}" + + # Try global scope + if base_type in all_defs: + return base_type + + # Search in all namespaces + for name in all_defs: + if name.endswith(f".{base_type}"): + return name + + return None + +def generate_fbs_schema(all_definitions, output_file): + """Generate FlatBuffers schema file from parsed definitions. + + Args: + all_definitions (dict): All parsed type definitions + output_file (str): Path to output .fbs file + """ + # Step 1: Filter and resolve dependencies + root_types = {name for name, data in all_definitions.items() if data['type'] == 'table'} + used_types = set() + queue = list(root_types) + + while queue: + type_name = queue.pop(0) + if type_name in used_types or type_name not in all_definitions: + continue + + used_types.add(type_name) + data = all_definitions[type_name] + + if data['type'] == 'table': + for field in data['fields']: + base_type = field['type'].strip('[]') + found_dep = find_full_type_name(base_type, data['ns'], all_definitions) + if found_dep and found_dep not in used_types: + queue.append(found_dep) + + final_definitions = {name: data for name, data in all_definitions.items() if name in used_types} + + # Step 2: Separate tables and enums + tables = {name: data for name, data in final_definitions.items() if data['type'] == 'table'} + enums = {name: data for name, data in final_definitions.items() if data['type'] == 'enum'} + + # Step 3: Topological sort for dependency order + in_degree = {t: 0 for t in tables} + adj = defaultdict(list) + + for name, data in tables.items(): + for field in data['fields']: + base_type = field['type'].strip('[]') + found_dep = find_full_type_name(base_type, data['ns'], tables) + if found_dep: + adj[found_dep].append(name) + in_degree[name] += 1 + + # Topological sort + queue = [t for t in tables if in_degree[t] == 0] + sorted_tables = [] + + while queue: + t = queue.pop(0) + sorted_tables.append(t) + for neighbor in adj.get(t, []): + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + # Handle cycles + cyclic_tables = set(tables.keys()) - set(sorted_tables) + sorted_tables.extend(list(cyclic_tables)) + + # Step 4: Group by namespace + defs_by_ns = defaultdict(lambda: {'enums': [], 'tables': [], 'cycles': []}) + + for name, data in enums.items(): + defs_by_ns[data['ns']]['enums'].append(data) + + for name in sorted_tables: + data = tables[name] + defs_by_ns[data['ns']]['tables'].append(data) + if name in cyclic_tables: + defs_by_ns[data['ns']]['cycles'].append(data['name']) + + # Step 5: Generate FlatBuffers schema file + with open(output_file, 'w', encoding='utf-8') as f: + f.write('// Auto-generated FlatBuffers schema\n') + f.write('// Field order is preserved. Key attributes are properly handled.\n\n') + + for ns, data in sorted(defs_by_ns.items()): + f.write(f'// ----- NAMESPACE: {ns} -----\n') + f.write(f'namespace {ns};\n\n') + + # Forward declarations for circular dependencies + if data['cycles']: + f.write('// Forward declarations for circular dependencies\n') + for table_name in sorted(data['cycles']): + f.write(f'table {table_name};\n') + f.write('\n') + + # Enums + if data['enums']: + f.write('// --- Enums ---\n') + for definition in sorted(data['enums'], key=lambda x: x['name']): + f.write(f'enum {definition["name"]} : {definition["base"]} {{\n') + for field in definition['fields']: + f.write(f' {field},\n') + f.write('}\n\n') + + # Tables + if data['tables']: + f.write('// --- Tables ---\n') + for definition in data['tables']: + f.write(f'table {definition["name"]} {{\n') + + # Handle field naming conflicts + snake_to_original = defaultdict(list) + for field in definition['fields']: + snake_to_original[pascal_to_snake_case(field['original'])].append(field['original']) + + # Track if key attribute was added + key_field_added = False + + for field in definition['fields']: + snake_name = pascal_to_snake_case(field['original']) + field_name = (field['original'] if len(snake_to_original[snake_name]) > 1 + else snake_name) + + is_array = field['type'].startswith('[') + base_type = field['type'].strip('[]') + final_type_str = field['type'] + + # Resolve type references + full_dep_name = find_full_type_name(base_type, definition['ns'], final_definitions) + if full_dep_name: + dep_data = final_definitions[full_dep_name] + simple_name = dep_data['name'] + + if dep_data['ns'] != definition['ns']: + final_type_str = f"{dep_data['ns']}.{simple_name}" + else: + final_type_str = simple_name + + if is_array: + final_type_str = f"[{final_type_str}]" + + # Add key attribute for primary key fields + key_suffix = "" + if (not key_field_added and + field_name.lower() in ['key', 'id'] and + not is_array): + key_suffix = " (key)" + key_field_added = True + + f.write(f' {field_name}:{final_type_str}{key_suffix};\n') + + f.write('}\n\n') + + print(f"Success! Schema with {len(final_definitions)} types saved to {output_file}") + +def main(): + """Main function to run the parser.""" + if not os.path.exists(INPUT_CSHARP_FILE): + print(f"Error: Input file '{INPUT_CSHARP_FILE}' not found.") + return + + print("Starting C# parsing...") + all_definitions = parse_csharp_file(INPUT_CSHARP_FILE) + print(f"Parsed {len(all_definitions)} definitions. Generating .fbs schema...") + generate_fbs_schema(all_definitions, OUTPUT_FBS_FILE) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..fe76d4c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +flatbuffers>=2.0.0 +tqdm>=4.50.0 +unidecode>=1.1.0 \ No newline at end of file diff --git a/setup_flatc.bat b/setup_flatc.bat new file mode 100644 index 0000000..fade388 --- /dev/null +++ b/setup_flatc.bat @@ -0,0 +1,83 @@ +@echo off +REM FlatBuffers Compiler Setup Script for Windows +REM This script downloads and sets up the FlatBuffers compiler (flatc.exe) for use with the BA-translator project + +echo FlatBuffers Compiler Setup for BA-translator +echo ============================================= +echo. + +REM Check if flatc.exe already exists +if exist "flatc.exe" ( + echo flatc.exe already exists in current directory. + echo Checking version... + flatc.exe --version + echo. + choice /C YN /M "Do you want to re-download flatc.exe" + if errorlevel 2 goto :end + echo. +) + +echo Downloading FlatBuffers compiler... +echo. + +REM Create temporary directory +set TEMP_DIR=%TEMP%\flatbuffers_download +if exist "%TEMP_DIR%" rmdir /s /q "%TEMP_DIR%" +mkdir "%TEMP_DIR%" + +REM Download the latest FlatBuffers release (adjust URL as needed) +echo Downloading from GitHub releases... +powershell -Command "& {[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; Invoke-WebRequest -Uri 'https://github.com/google/flatbuffers/releases/latest/download/Windows.flatc.binary.zip' -OutFile '%TEMP_DIR%\flatbuffers.zip'}" + +if not exist "%TEMP_DIR%\flatbuffers.zip" ( + echo. + echo ERROR: Failed to download FlatBuffers compiler. + echo Please download manually from: https://github.com/google/flatbuffers/releases + echo Extract flatc.exe to the current directory. + echo. + pause + goto :end +) + +echo Extracting flatc.exe... +powershell -Command "Expand-Archive -Path '%TEMP_DIR%\flatbuffers.zip' -DestinationPath '%TEMP_DIR%' -Force" + +REM Find and copy flatc.exe +for /r "%TEMP_DIR%" %%f in (flatc.exe) do ( + if exist "%%f" ( + copy "%%f" "%~dp0flatc.exe" >nul + echo flatc.exe copied to current directory. + goto :found + ) +) + +echo ERROR: flatc.exe not found in downloaded archive. +echo Please download manually and place flatc.exe in the current directory. +pause +goto :cleanup + +:found +echo. +echo Testing flatc.exe... +flatc.exe --version +if errorlevel 1 ( + echo ERROR: flatc.exe is not working properly. + pause + goto :cleanup +) + +echo. +echo SUCCESS: FlatBuffers compiler is ready! +echo. +echo You can now use the following commands: +echo python generate_flatbuffer_folders.py --auto +echo python generate_flatbuffer_folders.py schema.fbs +echo. + +:cleanup +REM Clean up temporary files +if exist "%TEMP_DIR%" rmdir /s /q "%TEMP_DIR%" + +:end +echo Setup complete. +pause \ No newline at end of file