Files
BA-translator/BAtranslator.py
Imp0ssibl33z 78be294352 Fixes
2025-10-11 13:25:39 +03:00

1593 lines
57 KiB
Python

#!/usr/bin/env python3
"""
BA Translator - A tool for extracting and applying translations for Blue Archive.
"""
import sys
import os
import sqlite3
import json
from collections import OrderedDict, defaultdict
import importlib
import shutil
import re
import argparse
from tqdm import tqdm
import flatbuffers
import csv
from pathlib import Path
# Add script directory to path for schema imports
script_dir = Path(__file__).parent.absolute()
if str(script_dir) not in sys.path:
sys.path.append(str(script_dir))
# Global configuration
DB_FILE = 'ExcelDB.db'
DB_BACKUP_FILE = 'ExcelDB.db.bak'
DEFAULT_JSON_FILE = 'translations.json'
REPO_MAP_FILE = 'repository_map.json'
BLOB_COLUMN = 'Bytes'
# Schema location mapping
SCHEMA_LOCATION_MAP = {}
def build_schema_map():
"""Build a mapping of schema class names to their module paths."""
for root, _, files in os.walk('.'):
for filename in files:
if filename.endswith('.py') and filename != '__init__.py':
class_name = filename[:-3]
relative_path = os.path.relpath(root, '.')
if relative_path == '.':
module_path = class_name
else:
module_path = os.path.normpath(relative_path).replace(os.sep, '.') + '.' + class_name
SCHEMA_LOCATION_MAP[class_name] = module_path
build_schema_map()
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
# Patterns for special character handling
CONTROL_CHAR_PATTERN = re.compile(r'[\x00-\x1f\x7F-\x9F]+')
STRICT_ASCII_FILTER_PATTERN = re.compile(r'^[\s\x21-\x7E{}:]*$')
def validate_required_files(*file_paths):
"""Validate that all required files exist.
Args:
*file_paths: Variable number of file paths to validate
Returns:
bool: True if all files exist, False otherwise
"""
missing_files = []
for file_path in file_paths:
if not os.path.exists(file_path):
missing_files.append(file_path)
if missing_files:
for file_path in missing_files:
print(f"ERROR: Required file '{file_path}' not found.")
return False
return True
def encode_special_chars(text):
"""Encode special control characters in text for safe JSON storage.
Args:
text (str): Input text that may contain control characters
Returns:
tuple: (clean_text, codes_list) where codes_list contains the encoded chars
"""
if not text:
return text, []
codes = CONTROL_CHAR_PATTERN.findall(text)
if not codes:
return text, []
def replacer_func(match):
index = len(replacer_func.codes)
replacer_func.codes.append(match.group(0))
return f"{{C:{index}}}"
replacer_func.codes = []
clean_text = CONTROL_CHAR_PATTERN.sub(replacer_func, text)
return clean_text, replacer_func.codes
def decode_special_chars(clean_text, codes):
"""Decode special characters back into text.
Args:
clean_text (str): Text with encoded placeholders
codes (list): List of original control characters
Returns:
str: Text with control characters restored
"""
if not clean_text or not codes:
return clean_text
placeholder_pattern = re.compile(r'{C:(\d+)}')
def replacer_func(match):
index = int(match.group(1))
if index < len(codes):
return codes[index]
return match.group(0)
return placeholder_pattern.sub(replacer_func, clean_text)
def flatbuffer_to_dict(obj):
"""Recursively convert a FlatBuffer object to a Python dictionary.
Args:
obj: FlatBuffer object to convert
Returns:
dict or primitive: Converted object
"""
if obj is None or isinstance(obj, (int, float, bool, str)):
return obj
if isinstance(obj, bytes):
return obj.decode('utf-8', 'ignore')
result = OrderedDict()
# Get all public methods that look like FlatBuffer accessors
for method_name in dir(obj):
if not method_name[0].isupper():
continue
method = getattr(obj, method_name)
if not callable(method) or method.__code__.co_argcount != 1:
continue
try:
value = method()
# Handle array-like values
if hasattr(value, 'Length') and callable(getattr(value, 'Length')):
result[method_name] = [
flatbuffer_to_dict(value(i))
for i in range(value.Length())
]
else:
result[method_name] = flatbuffer_to_dict(value)
except Exception:
# Skip methods that fail to call
continue
return result
def dict_to_flatbuffer(builder, data_dict, schema_class):
"""Build a FlatBuffer from a dictionary using the schema class.
Args:
builder: FlatBuffer builder instance
data_dict (dict): Data to serialize
schema_class: FlatBuffer schema class
Returns:
int: Offset of the created object
"""
schema_name = schema_class.__name__
schema_module = sys.modules[schema_class.__module__]
# Determine field order from the schema
field_order = []
if hasattr(schema_class, f'GetRootAs{schema_name}'):
add_methods = [
m for m in dir(schema_module)
if m.startswith(f"{schema_name}Add")
]
field_order = [
m.replace(f"{schema_name}Add", "")
for m in reversed(add_methods)
]
# Pre-create string pointers
string_pointers = {}
for key, value in data_dict.items():
if isinstance(value, str):
string_pointers[key] = builder.CreateString(value)
# Start building the object
start_method = getattr(schema_module, f"{schema_name}Start")
start_method(builder)
# Add fields in the correct order
for field_name in field_order:
if field_name in data_dict:
add_method = getattr(schema_module, f"{schema_name}Add{field_name}")
value = data_dict[field_name]
if field_name in string_pointers:
add_method(builder, string_pointers[field_name])
else:
add_method(builder, value)
# Finish building
end_method = getattr(schema_module, f"{schema_name}End")
return end_method(builder)
def create_translation_memory(old_json_data):
"""Create translation memory from existing JSON data.
Args:
old_json_data (dict): Previously translated data
Returns:
dict: Mapping of original text to translations
"""
memory = {}
print("Creating translation memory from existing data...")
for table_data in old_json_data.values():
for row_data in table_data.values():
for field_content in row_data.values():
if not isinstance(field_content, dict):
continue
if 'original' not in field_content or 'translation' not in field_content:
continue
original_struct = field_content['original']
translation_text = field_content['translation']
# Extract original text
if isinstance(original_struct, str):
original_text = original_struct
else:
original_text = original_struct.get('text')
# Store translation if it exists and differs from original
if translation_text and translation_text != original_text:
memory[original_text] = translation_text
print(f"Translation memory created with {len(memory)} unique translations.")
return memory
# ============================================================================
# MAIN FUNCTIONS
# ============================================================================
def extract_strings(output_file, filter_str=None, update_from=None):
"""Extract translatable strings from the database.
Args:
output_file (str): Path to output JSON file
filter_str (str): Optional filter (e.g., 'is_ascii', 'table_name:TableName')
update_from (str): Path to existing JSON file to merge translations from
"""
# Auto-setup: Create all required files if they don't exist
setup_required = False
# Check if repository_map.json exists
if not os.path.exists(REPO_MAP_FILE):
print(f"Repository map not found. Auto-generating from types.cs...")
setup_required = True
# Check if schema exists
if not os.path.exists('generated_schema.fbs'):
print("FlatBuffer schema not found. Generating from types.cs...")
if not os.path.exists('types.cs'):
print("ERROR: types.cs not found. Cannot auto-generate files.")
print("Please place types.cs in the project directory.")
return
# Generate schema
setup_schema_from_csharp('types.cs', 'generated_schema.fbs')
# Preprocess schema to fix reserved keywords
print("Preprocessing schema to fix Python reserved keywords...")
preprocess_flatbuffer_schema('generated_schema.fbs')
# Generate Python modules
print("Generating Python modules from schema...")
generate_flatbuffer_python('generated_schema.fbs', 'flatc.exe', '.')
print()
# Generate repository mapping
setup_repository_mapping('types.cs', REPO_MAP_FILE)
print()
if setup_required:
print("✓ Auto-setup completed! Proceeding with extraction...\n")
# Validate required files
if not validate_required_files(REPO_MAP_FILE, DB_FILE):
return
# Load existing translations if specified
translation_memory = {}
if update_from:
if os.path.exists(update_from):
with open(update_from, 'r', encoding='utf-8') as f:
old_data = json.load(f)
translation_memory = create_translation_memory(old_data)
else:
print(f"WARNING: Update file '{update_from}' not found.")
# Parse filter
filter_type, filter_value = _parse_filter(filter_str)
# Load repository mapping and connect to database
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = json.load(f)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
translations_dict = OrderedDict()
try:
print("Extracting translatable strings...")
for repo_info in tqdm(repo_map.values(), desc="Processing repositories"):
table_name = repo_info['table_name']
# Apply table filter
if filter_type == 'table_name' and table_name != filter_value:
continue
table_translations = _process_table(cursor, repo_info, filter_type, filter_value, translation_memory)
if table_translations:
translations_dict[table_name] = table_translations
finally:
conn.close()
if not translations_dict:
print("No strings found matching the filter.")
return
# Save results
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(translations_dict, f, indent=2, ensure_ascii=False)
print(f"\nSuccess! Translation data saved to '{output_file}'.")
def _parse_filter(filter_str):
"""Parse filter string into type and value."""
if not filter_str:
return None, None
if ':' in filter_str:
filter_type, filter_value = filter_str.split(':', 1)
else:
filter_type, filter_value = filter_str, None
print(f"Applying filter: type='{filter_type}', value='{filter_value}'")
return filter_type, filter_value
def _process_table(cursor, repo_info, filter_type, filter_value, translation_memory):
"""Process a single table and extract translatable strings."""
table_name = repo_info['table_name']
blob_schema_name = repo_info['blob_schema_class']
try:
# Load schema module
module_path = SCHEMA_LOCATION_MAP.get(blob_schema_name)
if not module_path:
return None
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, blob_schema_name)
get_root_method = getattr(schema_class, f"GetRootAs{blob_schema_name}")
# Check if table exists in database
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
if not cursor.fetchone():
# Table doesn't exist, skip it silently
return None
# Process table rows
cursor.execute(f'SELECT rowid, "{BLOB_COLUMN}" FROM "{table_name}"')
table_translations = OrderedDict()
for row_id, blob_data in cursor.fetchall():
if not blob_data:
continue
try:
# Parse FlatBuffer and extract strings
fbs_obj = get_root_method(blob_data, 0)
data_dict = flatbuffer_to_dict(fbs_obj)
string_fields = _extract_string_fields(data_dict, filter_type, filter_value, translation_memory)
if string_fields:
table_translations[row_id] = string_fields
except Exception:
# Skip rows that can't be processed
continue
return table_translations if table_translations else None
except (ImportError, AttributeError):
# Skip tables that can't be processed
return None
except sqlite3.OperationalError:
# Handle database errors (e.g., table doesn't exist) silently
return None
def _extract_string_fields(data_dict, filter_type, filter_value, translation_memory):
"""Extract and filter string fields from FlatBuffer data."""
string_fields = OrderedDict()
for field, value in data_dict.items():
if not isinstance(value, str) or not value:
continue
clean_text, codes = encode_special_chars(value)
# Apply content filters
if not _passes_filter(clean_text, filter_type, filter_value):
continue
# Create original entry
original_entry = {"text": clean_text, "codes": codes} if codes else clean_text
# Get existing translation
existing_translation = translation_memory.get(clean_text, "")
string_fields[field] = {
"original": original_entry,
"translation": existing_translation
}
return string_fields
def _passes_filter(text, filter_type, filter_value):
"""Check if text passes the specified filter."""
if filter_type is None:
return True
elif filter_type == 'is_ascii':
return bool(STRICT_ASCII_FILTER_PATTERN.match(text))
elif filter_type == 'contains_text':
return filter_value in text
return True
def patch_database(input_file):
"""Apply translations from JSON file to the database.
Args:
input_file (str): Path to JSON file containing translations
"""
if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE):
return
print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---")
# Confirm operation
response = input("Are you sure? A backup will be created. (yes/no): ").lower()
if response not in ['yes', 'y']:
print("Operation cancelled.")
return
# Create backup
print(f"Creating backup '{DB_BACKUP_FILE}'...")
shutil.copyfile(DB_FILE, DB_BACKUP_FILE)
# Load data
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = {v['table_name']: v for v in json.load(f).values()}
with open(input_file, 'r', encoding='utf-8') as f:
translations = json.load(f)
# Analyze translation changes
changes_to_apply = _analyze_translation_changes(translations)
if not changes_to_apply:
print("No changes found to apply.")
return
print(f"Found {len(changes_to_apply)} rows to update.")
# Apply changes to database
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
try:
updated_count = _apply_database_changes(cursor, repo_map, changes_to_apply)
conn.commit()
print(f"\nSuccess! Updated {updated_count} database entries.")
except Exception as e:
conn.rollback()
print(f"ERROR during patching: {e}")
print("Database rolled back to original state.")
finally:
conn.close()
def _analyze_translation_changes(translations):
"""Analyze translation JSON and extract changes to apply.
Args:
translations (dict): Translation data from JSON file
Returns:
list: List of changes to apply, each containing table, row_id, and fields
"""
changes_to_apply = []
for table_name, table_data in translations.items():
for row_id_str, fields in table_data.items():
changed_fields = {}
for field, content in fields.items():
# Check if field has translation that differs from original
if (isinstance(content, dict) and 'original' in content and
content.get('translation') and
content['translation'] != (content['original'] if isinstance(content['original'], str)
else content['original'].get('text', ''))):
# Decode special characters
original_struct = content['original']
codes = original_struct.get('codes', []) if isinstance(original_struct, dict) else []
final_text = decode_special_chars(content['translation'], codes)
changed_fields[field] = final_text
if changed_fields:
changes_to_apply.append({
'table': table_name,
'row_id': int(row_id_str),
'fields': changed_fields
})
return changes_to_apply
def _apply_database_changes(cursor, repo_map, changes_to_apply):
"""Apply translation changes to database.
Args:
cursor: SQLite cursor
repo_map (dict): Repository mapping information
changes_to_apply (list): List of changes to apply
Returns:
int: Number of successfully updated entries
"""
updated_count = 0
skipped_tables = set()
for change in tqdm(changes_to_apply, desc="Applying changes"):
table_name = change['table']
row_id = change['row_id']
fields = change['fields']
# Skip if table not in repository map
if table_name not in repo_map:
if table_name not in skipped_tables:
print(f"\nWARNING: Table '{table_name}' not found in repository map. Skipping...")
skipped_tables.add(table_name)
continue
try:
repo_info = repo_map[table_name]
# Get schema class
module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class'])
if not module_path:
if table_name not in skipped_tables:
print(f"\nWARNING: Schema class '{repo_info['blob_schema_class']}' not found. Skipping table '{table_name}'...")
skipped_tables.add(table_name)
continue
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, repo_info['blob_schema_class'])
get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}")
# Check if table exists in database
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
if not cursor.fetchone():
if table_name not in skipped_tables:
print(f"\nWARNING: Table '{table_name}' does not exist in database. Skipping...")
skipped_tables.add(table_name)
continue
# Get and update data
cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,))
result = cursor.fetchone()
if not result or not result[0]:
continue
# Parse FlatBuffer data
fbs_obj = get_root_method(result[0], 0)
data_dict = flatbuffer_to_dict(fbs_obj)
data_dict.update(fields)
# Rebuild and save
builder = flatbuffers.Builder(1024)
new_offset = dict_to_flatbuffer(builder, data_dict, schema_class)
builder.Finish(new_offset)
cursor.execute(
f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?',
(bytes(builder.Output()), row_id)
)
updated_count += 1
except sqlite3.OperationalError as e:
# Handle SQL errors (e.g., table doesn't exist)
if table_name not in skipped_tables:
print(f"\nWARNING: Database error for table '{table_name}': {e}. Skipping...")
skipped_tables.add(table_name)
continue
except Exception as e:
# Handle other errors silently or log them
continue
return updated_count
# ============================================================================
# SETUP AND UTILITY FUNCTIONS
# ============================================================================
def setup_schema_from_csharp(csharp_file='types.cs', output_fbs='generated_schema.fbs'):
"""Parse C# files and generate FlatBuffers schema using improved parser.
Args:
csharp_file (str): Path to C# file with type definitions
output_fbs (str): Output .fbs schema file path
"""
if not validate_required_files(csharp_file):
return
from unidecode import unidecode
from collections import defaultdict
print(f"Parsing C# file: {csharp_file}")
print("This may take a while for large files...")
# Configuration
DEFAULT_NAMESPACE = 'FlatData'
# Type mapping from C# to FlatBuffers
CSHARP_TO_FBS_TYPE_MAP = {
'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint',
'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double',
'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte'
}
def sanitize_identifier(name):
"""Clean identifier names for FlatBuffers compatibility."""
return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name))
def pascal_to_snake_case(name):
"""Convert PascalCase to snake_case."""
name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name)
name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name)
name = name.replace('-', '_')
return name.lower()
def csharp_to_fbs_type(csharp_type):
"""Convert C# type to FlatBuffers type."""
if csharp_type is None:
return 'int'
# Remove nullable indicators
csharp_type = csharp_type.replace('?', '')
# Check direct mappings
if csharp_type in CSHARP_TO_FBS_TYPE_MAP:
return CSHARP_TO_FBS_TYPE_MAP[csharp_type]
# Handle custom types
return sanitize_identifier(csharp_type)
def find_full_type_name(base_type, current_ns, all_defs):
"""Find the full qualified name for a type reference."""
# Try current namespace first
if f"{current_ns}.{base_type}" in all_defs:
return f"{current_ns}.{base_type}"
# Try default namespace
if f"{DEFAULT_NAMESPACE}.{base_type}" in all_defs:
return f"{DEFAULT_NAMESPACE}.{base_type}"
# Try global scope
if base_type in all_defs:
return base_type
# Search in all namespaces
for name in all_defs:
if name.endswith(f".{base_type}"):
return name
return None
# Parse C# file and extract table and enum definitions
all_definitions = {}
with open(csharp_file, 'r', encoding='utf-8') as f:
current_namespace = "_GLOBAL_"
in_block = None
current_name = None
current_fields = []
current_enum_base_type = 'int'
seen_enum_values = set()
print("Parsing C# file line by line...")
line_count = 0
for line in f:
line_count += 1
if line_count % 100000 == 0:
print(f" Processed {line_count:,} lines...")
line = line.strip()
# Parse namespace declarations
ns_match = re.match(r'namespace (\S+)', line)
if ns_match:
current_namespace = ns_match.group(1).replace(';', '')
continue
# End of block
if line.startswith('}') and in_block:
full_name = f"{current_namespace}.{current_name}"
if in_block == 'table':
all_definitions[full_name] = {
'type': 'table',
'ns': current_namespace,
'name': current_name,
'fields': current_fields
}
elif in_block == 'enum':
all_definitions[full_name] = {
'type': 'enum',
'ns': current_namespace,
'name': current_name,
'base': current_enum_base_type,
'fields': current_fields
}
in_block = None
current_fields = []
continue
# Parse struct/table definitions
table_match = re.search(r'public struct (\w+) : IFlatbufferObject', line)
enum_match = re.search(r'public enum (\w+)(?:\s*:\s*(\w+))?', line)
if table_match:
in_block = 'table'
current_name = sanitize_identifier(table_match.group(1))
continue
elif enum_match:
in_block = 'enum'
current_name = sanitize_identifier(enum_match.group(1))
csharp_base_type = enum_match.group(2)
current_enum_base_type = csharp_to_fbs_type(csharp_base_type)
seen_enum_values.clear()
continue
if not in_block:
continue
# Parse enum fields
if in_block == 'enum':
field_match = re.match(r'(\w+)\s*=\s*(-?\d+)', line)
if field_match:
field_name = sanitize_identifier(field_match.group(1))
field_value = int(field_match.group(2))
if field_value not in seen_enum_values:
seen_enum_values.add(field_value)
current_fields.append(f'{field_name} = {field_value}')
continue
# Parse table fields
if in_block == 'table':
if not line.startswith('public'):
continue
# Parse vector methods
vec_match = re.search(
r'public\s+(?:[^\s<]+<(\S+)>|(\S+))\s+(\w+)\s*\(int\s+\w+\)',
line
)
if vec_match:
csharp_type = vec_match.group(1) if vec_match.group(1) else vec_match.group(2)
fbs_type = csharp_to_fbs_type(csharp_type)
current_fields.append({
'original': sanitize_identifier(vec_match.group(3)),
'type': f'[{fbs_type}]'
})
continue
# Parse property definitions
prop_match = re.search(
r'public\s+(?:Nullable<(\S+)>|ArraySegment<byte>|(\S+))\s+(\w+)\s*{',
line
)
if prop_match:
nullable_type, full_type, csharp_name = prop_match.groups()
csharp_type = nullable_type if nullable_type else full_type
# Skip internal FlatBuffers fields
if csharp_name == 'ByteBuffer' or csharp_name.endswith('Length'):
continue
# Determine field type
if csharp_type == 'ArraySegment<byte>':
field_type = '[ubyte]'
else:
field_type = csharp_to_fbs_type(csharp_type)
current_fields.append({
'original': sanitize_identifier(csharp_name),
'type': field_type
})
continue
print(f"Parsed {len(all_definitions)} definitions from {line_count:,} lines")
# Handle global namespace
if "_GLOBAL_" in {d['ns'] for d in all_definitions.values()}:
for name, data in list(all_definitions.items()):
if data['ns'] == "_GLOBAL_":
new_name = f"{DEFAULT_NAMESPACE}.{data['name']}"
all_definitions[new_name] = data
data['ns'] = DEFAULT_NAMESPACE
del all_definitions[name]
# Filter and resolve dependencies
print("Resolving dependencies...")
root_types = {name for name, data in all_definitions.items() if data['type'] == 'table'}
used_types = set()
queue = list(root_types)
while queue:
type_name = queue.pop(0)
if type_name in used_types or type_name not in all_definitions:
continue
used_types.add(type_name)
data = all_definitions[type_name]
if data['type'] == 'table':
for field in data['fields']:
base_type = field['type'].strip('[]')
found_dep = find_full_type_name(base_type, data['ns'], all_definitions)
if found_dep and found_dep not in used_types:
queue.append(found_dep)
final_definitions = {name: data for name, data in all_definitions.items() if name in used_types}
# Separate tables and enums
tables = {name: data for name, data in final_definitions.items() if data['type'] == 'table'}
enums = {name: data for name, data in final_definitions.items() if data['type'] == 'enum'}
print(f"Final schema: {len(tables)} tables, {len(enums)} enums")
# Generate FlatBuffers schema file
print(f"Generating schema file: {output_fbs}")
with open(output_fbs, 'w', encoding='utf-8') as f:
f.write('// Auto-generated FlatBuffers schema\n')
f.write('// Field order is preserved. Key attributes are properly handled.\n\n')
# Group by namespace
defs_by_ns = defaultdict(lambda: {'enums': [], 'tables': []})
for name, data in enums.items():
defs_by_ns[data['ns']]['enums'].append(data)
for name, data in tables.items():
defs_by_ns[data['ns']]['tables'].append(data)
for ns, data in sorted(defs_by_ns.items()):
f.write(f'// ----- NAMESPACE: {ns} -----\n')
f.write(f'namespace {ns};\n\n')
# Enums
if data['enums']:
f.write('// --- Enums ---\n')
for definition in sorted(data['enums'], key=lambda x: x['name']):
f.write(f'enum {definition["name"]} : {definition["base"]} {{\n')
for field in definition['fields']:
f.write(f' {field},\n')
f.write('}\n\n')
# Tables
if data['tables']:
f.write('// --- Tables ---\n')
for definition in data['tables']:
f.write(f'table {definition["name"]} {{\n')
# Handle field naming conflicts
snake_to_original = defaultdict(list)
for field in definition['fields']:
snake_to_original[pascal_to_snake_case(field['original'])].append(field['original'])
# Track if key attribute was added
key_field_added = False
for field in definition['fields']:
snake_name = pascal_to_snake_case(field['original'])
field_name = (field['original'] if len(snake_to_original[snake_name]) > 1
else snake_name)
is_array = field['type'].startswith('[')
base_type = field['type'].strip('[]')
final_type_str = field['type']
# Resolve type references
full_dep_name = find_full_type_name(base_type, definition['ns'], final_definitions)
if full_dep_name:
dep_data = final_definitions[full_dep_name]
simple_name = dep_data['name']
if dep_data['ns'] != definition['ns']:
final_type_str = f"{dep_data['ns']}.{simple_name}"
else:
final_type_str = simple_name
if is_array:
final_type_str = f"[{final_type_str}]"
# Add key attribute for primary key fields
key_suffix = ""
if (not key_field_added and
field_name.lower() in ['key', 'id'] and
not is_array):
key_suffix = " (key)"
key_field_added = True
f.write(f' {field_name}:{final_type_str}{key_suffix};\n')
f.write('}\n\n')
print(f"Success! Generated {len(tables)} tables and {len(enums)} enums.")
def setup_repository_mapping(csharp_file='types.cs', output_json='repository_map.json'):
"""Parse C# file to extract repository and database schema information.
This function creates a mapping file that connects repositories to their corresponding
database tables and schema classes. Based on the proven logic from dumpdbschema.py.
Args:
csharp_file (str): Path to C# file with type definitions
output_json (str): Output JSON mapping file path
"""
if not validate_required_files(csharp_file):
return
print(f"Analyzing '{csharp_file}' to create repository mapping...")
# Read the entire file for processing
print("Reading large C# file for repository mapping...")
with open(csharp_file, 'r', encoding='utf-8') as f:
content = f.read()
print(f"File content loaded: {len(content):,} characters")
# Regular expressions for parsing - improved patterns
repo_pattern = re.compile(
r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>',
re.MULTILINE
)
db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema', re.MULTILINE)
prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*\{\s*get;\s*set;\s*\}')
db_schemas = OrderedDict()
repositories = OrderedDict()
print("Parsing repository definitions...")
# Find all repository definitions
repo_matches = list(repo_pattern.finditer(content))
for match in repo_matches:
repo_name = match.group(1)
db_schema_class = match.group(2).strip()
blob_schema_class = match.group(3).strip()
repositories[repo_name] = {
'db_schema_class': db_schema_class,
'blob_schema_class': blob_schema_class
}
print(f"Found {len(repositories)} repository classes")
print("Parsing database schema definitions...")
# Find all database schema definitions
schema_matches = list(db_schema_pattern.finditer(content))
for match in schema_matches:
schema_name = match.group(1)
# Find the class body by locating the opening brace and matching closing brace
match_end = match.end()
# Look for the opening brace after the class declaration
brace_start = content.find('{', match_end)
if brace_start == -1:
continue
# Find matching closing brace
brace_count = 1
pos = brace_start + 1
brace_end = -1
while pos < len(content) and brace_count > 0:
if content[pos] == '{':
brace_count += 1
elif content[pos] == '}':
brace_count -= 1
if brace_count == 0:
brace_end = pos
break
pos += 1
if brace_end > brace_start:
# Extract class body
class_body = content[brace_start + 1:brace_end]
# Parse properties in this schema
properties = []
for prop_match in prop_pattern.finditer(class_body):
prop_type = prop_match.group(1)
prop_name = prop_match.group(2)
properties.append({
'name': prop_name,
'type': prop_type
})
db_schemas[schema_name] = properties
print(f"Found {len(db_schemas)} database schema classes")
# Combine information into final mapping
final_map = OrderedDict()
for repo_name, repo_data in repositories.items():
db_schema_name = repo_data['db_schema_class']
# Database table name is the database schema class name
table_name = db_schema_name
# Find key columns for this schema
key_columns = db_schemas.get(db_schema_name, [])
final_map[repo_name] = {
'table_name': table_name,
'key_columns': key_columns,
'blob_schema_class': repo_data['blob_schema_class']
}
print(f"Saving repository mapping to '{output_json}'...")
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(final_map, f, indent=2, ensure_ascii=False)
print(f"Success! Repository mapping created with {len(final_map)} repositories.")
print(f"You can now use '{output_json}' as the source of truth for database operations.")
print(f"Mapping saved to: {output_json}")
def preprocess_flatbuffer_schema(input_fbs, output_fbs=None):
"""Preprocess FlatBuffer schema to rename Python reserved keywords.
Args:
input_fbs (str): Input schema file
output_fbs (str): Output schema file (if None, modifies in place)
"""
if not validate_required_files(input_fbs):
return
reserved = [
'self', 'class', 'def', 'return', 'import', 'from', 'as',
'if', 'elif', 'else', 'while', 'for', 'in', 'is', 'not',
'and', 'or', 'True', 'False', 'None', 'pass', 'break',
'continue', 'try', 'except', 'finally', 'raise', 'with',
'yield', 'lambda', 'global', 'nonlocal'
]
with open(input_fbs, 'r', encoding='utf-8') as f:
content = f.read()
modified = False
for keyword in reserved:
pattern = rf'\b({keyword})(\s*:\s*\w+)'
if re.search(pattern, content):
content = re.sub(pattern, rf'\1_\2', content)
modified = True
print(f" Renamed '{keyword}' -> '{keyword}_'")
output_file = output_fbs or input_fbs
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
if modified:
print(f"Preprocessed schema saved to: {output_file}")
else:
print("No reserved keywords found in schema.")
def generate_flatbuffer_python(fbs_file, flatc_exe='flatc.exe', output_dir='.'):
"""Generate Python modules from FlatBuffer schema.
Args:
fbs_file (str): FlatBuffer schema file (.fbs)
flatc_exe (str): Path to flatc compiler
output_dir (str): Output directory for generated Python files
"""
if not validate_required_files(fbs_file, flatc_exe):
return
print(f"Generating Python modules from: {fbs_file}")
# Run flatc compiler
cmd = [
flatc_exe,
'--python',
'--gen-object-api',
'-o', output_dir,
fbs_file
]
result = os.system(' '.join(cmd))
if result == 0:
print("Success! Python modules generated.")
else:
print(f"ERROR: flatc failed with code {result}")
def fix_flatbuffer_reserved_names(directory='MX'):
"""Fix Python reserved keywords in generated FlatBuffer files.
Args:
directory (str): Directory containing generated Python files
"""
from pathlib import Path
if not os.path.exists(directory):
print(f"ERROR: Directory '{directory}' not found")
return
print(f"Scanning {directory} for reserved keyword issues...")
reserved_map = {'self': 'self_', 'class': 'class_', 'import': 'import_'}
fixed_count = 0
for py_file in Path(directory).rglob('*.py'):
try:
with open(py_file, 'r', encoding='utf-8') as f:
content = f.read()
original = content
for reserved, new_name in reserved_map.items():
# Fix parameter names
pattern = rf'(def __init__\([^)]*\n\s+self,\n(?:[^)]*\n)*?\s+){reserved}(\s*=)'
if re.search(pattern, content):
content = re.sub(pattern, rf'\1{new_name}\2', content)
content = content.replace(f'self.{reserved} = {reserved}', f'self.{new_name} = {new_name}')
print(f" Fixed: {py_file.name}")
if content != original:
with open(py_file, 'w', encoding='utf-8') as f:
f.write(content)
fixed_count += 1
except Exception as e:
print(f" ERROR in {py_file}: {e}")
print(f"\nFixed {fixed_count} file(s).")
# ============================================================================
# CSV EXPORT/IMPORT FUNCTIONS
# ============================================================================
def export_to_csv(json_file, csv_file):
"""Export JSON translation file to CSV format for translators."""
if not validate_required_files(json_file):
return
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Exporting translations to '{csv_file}'...")
# Process all records
all_rows = []
text_groups = {}
group_id = 1
for table_name, table_data in data.items():
for row_id, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
codes = [] if isinstance(original, str) else original.get('codes', [])
if text not in text_groups:
text_groups[text] = group_id
group_id += 1
all_rows.append([
text_groups[text], text, content.get('translation', ''),
table_name, row_id, field, 'Y' if codes else '',
json.dumps(codes) if codes else ''
])
# Write CSV files
_write_csv_files(csv_file, all_rows, text_groups)
print(f"Export completed! Unique texts: {len(text_groups)}")
def _write_csv_files(csv_file, all_rows, text_groups):
"""Write main CSV and simplified translator version."""
# Main CSV
with open(csv_file, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'SQLTable', 'RowID', 'Field', 'HasCodes', 'Codes'])
writer.writerows(sorted(all_rows, key=lambda x: x[0]))
# Simplified translator CSV
translator_csv = csv_file.replace('.csv', '_for_translators.csv')
unique_texts = {}
for row in all_rows:
text = row[1]
if text not in unique_texts:
unique_texts[text] = [text_groups[text], text, row[2], set()]
unique_texts[text][3].add(row[3])
with open(translator_csv, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'Tables'])
for text, info in sorted(unique_texts.items(), key=lambda x: x[1][0]):
writer.writerow([info[0], info[1], info[2], '|'.join(sorted(info[3]))])
print(f"Translator version: {translator_csv}")
def import_from_csv(csv_file, json_file, original_json_file=None):
"""Import translations from CSV file."""
if not original_json_file:
original_json_file = json_file
if not validate_required_files(csv_file, original_json_file):
return
# Load data
with open(original_json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load translations
is_simple = '_for_translators' in csv_file
translations = {}
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3 and row[2]: # Has translation
if is_simple:
translations[row[1]] = row[2] # original -> translation
elif len(row) >= 6:
key = f"{row[3]}:{row[4]}:{row[5]}" # table:row:field
codes = json.loads(row[7]) if len(row) > 7 and row[7] else []
translations[key] = {'original': row[1], 'translation': row[2], 'codes': codes}
# Apply translations
updated_count = 0
for table_name, table_data in data.items():
for row_id_str, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
new_translation = None
if is_simple and text in translations:
new_translation = translations[text]
elif not is_simple:
key = f"{table_name}:{row_id_str}:{field}"
if key in translations and translations[key]['original'] == text:
new_translation = translations[key]['translation']
if new_translation and new_translation != content.get('translation', ''):
content['translation'] = new_translation
updated_count += 1
# Save result
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Success! Updated {updated_count} translations.")
def validate_csv(csv_file):
"""Check CSV file for consistency and issues."""
if not validate_required_files(csv_file):
return
print(f"Validating '{csv_file}'...")
groups = {}
total_rows = 0
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3:
total_rows += 1
group_id, original, translation = row[0], row[1], row[2]
if group_id not in groups:
groups[group_id] = {'original': original, 'translations': set()}
if translation:
groups[group_id]['translations'].add(translation)
# Find issues
issues = [issue for group_id, info in groups.items()
if len(info['translations']) > 1
for issue in [{'group_id': group_id, 'original': info['original'],
'translations': list(info['translations'])}]]
translated_groups = sum(1 for info in groups.values() if info['translations'])
# Report results
print(f"\n=== VALIDATION RESULTS ===")
print(f"Total rows: {total_rows}")
print(f"Unique groups: {len(groups)}")
print(f"Translated groups: {translated_groups}")
print(f"Untranslated groups: {len(groups) - translated_groups}")
if issues:
print(f"\n=== FOUND ISSUES: {len(issues)} ===")
for issue in issues[:5]:
print(f"\nGroup {issue['group_id']}: {issue['original'][:50]}...")
print("Different translations:")
for trans in issue['translations']:
print(f" - {trans}")
if len(issues) > 5:
print(f"\n... and {len(issues) - 5} more issues")
# Save detailed report
report_file = csv_file.replace('.csv', '_issues.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write("=== ISSUE REPORT ===\n\n")
for issue in issues:
f.write(f"Group {issue['group_id']}: {issue['original']}\n")
f.write("Different translations:\n")
for trans in issue['translations']:
f.write(f" - {trans}\n")
f.write("\n")
print(f"\nDetailed report saved to: {report_file}")
else:
print("\nNo issues found!")
def main():
"""Main CLI entry point for the BA-translator tool.
Provides commands for extracting, translating, and patching game strings
using CSV workflow for translators.
"""
parser = argparse.ArgumentParser(
description="Game localization tool for Blue Archive."
)
subparsers = parser.add_subparsers(
dest='command',
required=True,
help='Available commands'
)
# Extract command - extract strings from database to JSON
parser_extract = subparsers.add_parser(
'extract',
help='Extract translatable strings from database to JSON file.'
)
parser_extract.add_argument(
'--filter',
type=str,
help='Filter for extraction. Formats: is_ascii, table_name:TableName, contains_text:Word'
)
parser_extract.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file name (default: {DEFAULT_JSON_FILE})'
)
parser_extract.add_argument(
'--update-from',
type=str,
help='Path to existing JSON file to merge translations from.'
)
# Patch command - apply translations to database
parser_patch = subparsers.add_parser(
'patch',
help='Apply translations from JSON file to the database.'
)
parser_patch.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file name (default: {DEFAULT_JSON_FILE})'
)
# CSV export command - convert JSON to CSV for translators
parser_export_csv = subparsers.add_parser(
'export_csv',
help='Export JSON translations to CSV format for translators.'
)
parser_export_csv.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_export_csv.add_argument(
'--output',
type=str,
default='translations.csv',
help='Output CSV file (default: translations.csv)'
)
# CSV import command - convert CSV back to JSON
parser_import_csv = subparsers.add_parser(
'import_csv',
help='Import translations from CSV back to JSON format.'
)
parser_import_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='Input CSV file (default: translations.csv)'
)
parser_import_csv.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_import_csv.add_argument(
'--original',
type=str,
help='Original JSON file for structure reference (if different from output)'
)
# CSV validation command - check CSV for issues
parser_validate_csv = subparsers.add_parser(
'validate_csv',
help='Validate CSV file for consistency and translation issues.'
)
parser_validate_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='CSV file to validate (default: translations.csv)'
)
# Setup schema command - generate FlatBuffer schema from C#
parser_setup_schema = subparsers.add_parser(
'setup_schema',
help='Parse C# files and generate FlatBuffer schema (.fbs file).'
)
parser_setup_schema.add_argument(
'--csharp',
type=str,
default='types.cs',
help='Input C# file with type definitions (default: types.cs)'
)
parser_setup_schema.add_argument(
'--output',
type=str,
default='generated_schema.fbs',
help='Output .fbs schema file (default: generated_schema.fbs)'
)
# Setup mapping command - create repository mapping
parser_setup_mapping = subparsers.add_parser(
'setup_mapping',
help='Create repository mapping from C# files.'
)
parser_setup_mapping.add_argument(
'--csharp',
type=str,
default='types.cs',
help='Input C# file (default: types.cs)'
)
parser_setup_mapping.add_argument(
'--output',
type=str,
default='repository_map.json',
help='Output mapping JSON file (default: repository_map.json)'
)
# Generate FlatBuffers command - generate Python modules
parser_gen_fb = subparsers.add_parser(
'generate_flatbuffers',
help='Generate Python modules from FlatBuffer schema with preprocessing.'
)
parser_gen_fb.add_argument(
'--schema',
type=str,
default='generated_schema.fbs',
help='Input .fbs schema file (default: generated_schema.fbs)'
)
parser_gen_fb.add_argument(
'--flatc',
type=str,
default='flatc.exe',
help='Path to flatc compiler (default: flatc.exe)'
)
parser_gen_fb.add_argument(
'--no-preprocess',
action='store_true',
help='Skip preprocessing (fixing reserved keywords)'
)
# Fix reserved names command - fix generated Python files
parser_fix_names = subparsers.add_parser(
'fix_reserved_names',
help='Fix Python reserved keywords in generated FlatBuffer files.'
)
parser_fix_names.add_argument(
'--directory',
type=str,
default='MX',
help='Directory with generated Python files (default: MX)'
)
# Parse arguments and execute appropriate command
args = parser.parse_args()
try:
if args.command == 'extract':
extract_strings(args.output, args.filter, args.update_from)
elif args.command == 'patch':
patch_database(args.input)
elif args.command == 'export_csv':
export_to_csv(args.input, args.output)
elif args.command == 'import_csv':
import_from_csv(args.input, args.output, args.original)
elif args.command == 'validate_csv':
validate_csv(args.input)
elif args.command == 'setup_schema':
setup_schema_from_csharp(args.csharp, args.output)
elif args.command == 'setup_mapping':
setup_repository_mapping(args.csharp, args.output)
elif args.command == 'generate_flatbuffers':
if not args.no_preprocess:
print("Preprocessing schema to fix reserved keywords...")
preprocess_flatbuffer_schema(args.schema)
generate_flatbuffer_python(args.schema, args.flatc)
elif args.command == 'fix_reserved_names':
fix_flatbuffer_reserved_names(args.directory)
else:
print(f"ERROR: Unknown command '{args.command}'")
parser.print_help()
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
except Exception as e:
print(f"ERROR: {str(e)}")
return 1
return 0
if __name__ == "__main__":
main()