Files
BA-translator/BAtranslator.py
2025-09-30 17:29:03 +03:00

1340 lines
47 KiB
Python

#!/usr/bin/env python3
"""
BA Translator - A tool for extracting and applying translations for Blue Archive.
"""
import sys
import os
import sqlite3
import json
from collections import OrderedDict, defaultdict
import importlib
import shutil
import re
import argparse
from tqdm import tqdm
import flatbuffers
import csv
from pathlib import Path
# Add script directory to path for schema imports
script_dir = Path(__file__).parent.absolute()
if str(script_dir) not in sys.path:
sys.path.append(str(script_dir))
# Global configuration
DB_FILE = 'ExcelDB.db'
DB_BACKUP_FILE = 'ExcelDB.db.bak'
DEFAULT_JSON_FILE = 'translations.json'
REPO_MAP_FILE = 'repository_map.json'
BLOB_COLUMN = 'Bytes'
# Schema location mapping
SCHEMA_LOCATION_MAP = {}
def build_schema_map():
"""Build a mapping of schema class names to their module paths."""
for root, _, files in os.walk('.'):
for filename in files:
if filename.endswith('.py') and filename != '__init__.py':
class_name = filename[:-3]
relative_path = os.path.relpath(root, '.')
if relative_path == '.':
module_path = class_name
else:
module_path = os.path.normpath(relative_path).replace(os.sep, '.') + '.' + class_name
SCHEMA_LOCATION_MAP[class_name] = module_path
build_schema_map()
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
# Patterns for special character handling
CONTROL_CHAR_PATTERN = re.compile(r'[\x00-\x1f\x7F-\x9F]+')
STRICT_ASCII_FILTER_PATTERN = re.compile(r'^[\s\x21-\x7E{}:]*$')
def validate_required_files(*file_paths):
"""Validate that all required files exist.
Args:
*file_paths: Variable number of file paths to validate
Returns:
bool: True if all files exist, False otherwise
"""
missing_files = []
for file_path in file_paths:
if not os.path.exists(file_path):
missing_files.append(file_path)
if missing_files:
for file_path in missing_files:
print(f"ERROR: Required file '{file_path}' not found.")
return False
return True
def encode_special_chars(text):
"""Encode special control characters in text for safe JSON storage.
Args:
text (str): Input text that may contain control characters
Returns:
tuple: (clean_text, codes_list) where codes_list contains the encoded chars
"""
if not text:
return text, []
codes = CONTROL_CHAR_PATTERN.findall(text)
if not codes:
return text, []
def replacer_func(match):
index = len(replacer_func.codes)
replacer_func.codes.append(match.group(0))
return f"{{C:{index}}}"
replacer_func.codes = []
clean_text = CONTROL_CHAR_PATTERN.sub(replacer_func, text)
return clean_text, replacer_func.codes
def decode_special_chars(clean_text, codes):
"""Decode special characters back into text.
Args:
clean_text (str): Text with encoded placeholders
codes (list): List of original control characters
Returns:
str: Text with control characters restored
"""
if not clean_text or not codes:
return clean_text
placeholder_pattern = re.compile(r'{C:(\d+)}')
def replacer_func(match):
index = int(match.group(1))
if index < len(codes):
return codes[index]
return match.group(0)
return placeholder_pattern.sub(replacer_func, clean_text)
def flatbuffer_to_dict(obj):
"""Recursively convert a FlatBuffer object to a Python dictionary.
Args:
obj: FlatBuffer object to convert
Returns:
dict or primitive: Converted object
"""
if obj is None or isinstance(obj, (int, float, bool, str)):
return obj
if isinstance(obj, bytes):
return obj.decode('utf-8', 'ignore')
result = OrderedDict()
# Get all public methods that look like FlatBuffer accessors
for method_name in dir(obj):
if not method_name[0].isupper():
continue
method = getattr(obj, method_name)
if not callable(method) or method.__code__.co_argcount != 1:
continue
try:
value = method()
# Handle array-like values
if hasattr(value, 'Length') and callable(getattr(value, 'Length')):
result[method_name] = [
flatbuffer_to_dict(value(i))
for i in range(value.Length())
]
else:
result[method_name] = flatbuffer_to_dict(value)
except Exception:
# Skip methods that fail to call
continue
return result
def dict_to_flatbuffer(builder, data_dict, schema_class):
"""Build a FlatBuffer from a dictionary using the schema class.
Args:
builder: FlatBuffer builder instance
data_dict (dict): Data to serialize
schema_class: FlatBuffer schema class
Returns:
int: Offset of the created object
"""
schema_name = schema_class.__name__
schema_module = sys.modules[schema_class.__module__]
# Determine field order from the schema
field_order = []
if hasattr(schema_class, f'GetRootAs{schema_name}'):
add_methods = [
m for m in dir(schema_module)
if m.startswith(f"{schema_name}Add")
]
field_order = [
m.replace(f"{schema_name}Add", "")
for m in reversed(add_methods)
]
# Pre-create string pointers
string_pointers = {}
for key, value in data_dict.items():
if isinstance(value, str):
string_pointers[key] = builder.CreateString(value)
# Start building the object
start_method = getattr(schema_module, f"{schema_name}Start")
start_method(builder)
# Add fields in the correct order
for field_name in field_order:
if field_name in data_dict:
add_method = getattr(schema_module, f"{schema_name}Add{field_name}")
value = data_dict[field_name]
if field_name in string_pointers:
add_method(builder, string_pointers[field_name])
else:
add_method(builder, value)
# Finish building
end_method = getattr(schema_module, f"{schema_name}End")
return end_method(builder)
def create_translation_memory(old_json_data):
"""Create translation memory from existing JSON data.
Args:
old_json_data (dict): Previously translated data
Returns:
dict: Mapping of original text to translations
"""
memory = {}
print("Creating translation memory from existing data...")
for table_data in old_json_data.values():
for row_data in table_data.values():
for field_content in row_data.values():
if not isinstance(field_content, dict):
continue
if 'original' not in field_content or 'translation' not in field_content:
continue
original_struct = field_content['original']
translation_text = field_content['translation']
# Extract original text
if isinstance(original_struct, str):
original_text = original_struct
else:
original_text = original_struct.get('text')
# Store translation if it exists and differs from original
if translation_text and translation_text != original_text:
memory[original_text] = translation_text
print(f"Translation memory created with {len(memory)} unique translations.")
return memory
# ============================================================================
# MAIN FUNCTIONS
# ============================================================================
def extract_strings(output_file, filter_str=None, update_from=None):
"""Extract translatable strings from the database.
Args:
output_file (str): Path to output JSON file
filter_str (str): Optional filter (e.g., 'is_ascii', 'table_name:TableName')
update_from (str): Path to existing JSON file to merge translations from
"""
# Auto-setup: Create all required files if they don't exist
setup_required = False
# Check if repository_map.json exists
if not os.path.exists(REPO_MAP_FILE):
print(f"Repository map not found. Auto-generating from types.cs...")
setup_required = True
# Check if schema exists
if not os.path.exists('generated_schema.fbs'):
print("FlatBuffer schema not found. Generating from types.cs...")
if not os.path.exists('types.cs'):
print("ERROR: types.cs not found. Cannot auto-generate files.")
print("Please place types.cs in the project directory.")
return
# Generate schema
setup_schema_from_csharp('types.cs', 'generated_schema.fbs')
# Preprocess schema to fix reserved keywords
print("Preprocessing schema to fix Python reserved keywords...")
preprocess_flatbuffer_schema('generated_schema.fbs')
# Generate Python modules
print("Generating Python modules from schema...")
generate_flatbuffer_python('generated_schema.fbs', 'flatc.exe', '.')
print()
# Generate repository mapping
setup_repository_mapping('types.cs', REPO_MAP_FILE)
print()
if setup_required:
print("✓ Auto-setup completed! Proceeding with extraction...\n")
# Validate required files
if not validate_required_files(REPO_MAP_FILE, DB_FILE):
return
# Load existing translations if specified
translation_memory = {}
if update_from:
if os.path.exists(update_from):
with open(update_from, 'r', encoding='utf-8') as f:
old_data = json.load(f)
translation_memory = create_translation_memory(old_data)
else:
print(f"WARNING: Update file '{update_from}' not found.")
# Parse filter
filter_type, filter_value = _parse_filter(filter_str)
# Load repository mapping and connect to database
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = json.load(f)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
translations_dict = OrderedDict()
try:
print("Extracting translatable strings...")
for repo_info in tqdm(repo_map.values(), desc="Processing repositories"):
table_name = repo_info['table_name']
# Apply table filter
if filter_type == 'table_name' and table_name != filter_value:
continue
table_translations = _process_table(cursor, repo_info, filter_type, filter_value, translation_memory)
if table_translations:
translations_dict[table_name] = table_translations
finally:
conn.close()
if not translations_dict:
print("No strings found matching the filter.")
return
# Save results
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(translations_dict, f, indent=2, ensure_ascii=False)
print(f"\nSuccess! Translation data saved to '{output_file}'.")
def _parse_filter(filter_str):
"""Parse filter string into type and value."""
if not filter_str:
return None, None
if ':' in filter_str:
filter_type, filter_value = filter_str.split(':', 1)
else:
filter_type, filter_value = filter_str, None
print(f"Applying filter: type='{filter_type}', value='{filter_value}'")
return filter_type, filter_value
def _process_table(cursor, repo_info, filter_type, filter_value, translation_memory):
"""Process a single table and extract translatable strings."""
table_name = repo_info['table_name']
blob_schema_name = repo_info['blob_schema_class']
try:
# Load schema module
module_path = SCHEMA_LOCATION_MAP.get(blob_schema_name)
if not module_path:
return None
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, blob_schema_name)
get_root_method = getattr(schema_class, f"GetRootAs{blob_schema_name}")
# Check if table exists in database
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
if not cursor.fetchone():
# Table doesn't exist, skip it silently
return None
# Process table rows
cursor.execute(f'SELECT rowid, "{BLOB_COLUMN}" FROM "{table_name}"')
table_translations = OrderedDict()
for row_id, blob_data in cursor.fetchall():
if not blob_data:
continue
try:
# Parse FlatBuffer and extract strings
fbs_obj = get_root_method(blob_data, 0)
data_dict = flatbuffer_to_dict(fbs_obj)
string_fields = _extract_string_fields(data_dict, filter_type, filter_value, translation_memory)
if string_fields:
table_translations[row_id] = string_fields
except Exception:
# Skip rows that can't be processed
continue
return table_translations if table_translations else None
except (ImportError, AttributeError):
# Skip tables that can't be processed
return None
except sqlite3.OperationalError:
# Handle database errors (e.g., table doesn't exist) silently
return None
def _extract_string_fields(data_dict, filter_type, filter_value, translation_memory):
"""Extract and filter string fields from FlatBuffer data."""
string_fields = OrderedDict()
for field, value in data_dict.items():
if not isinstance(value, str) or not value:
continue
clean_text, codes = encode_special_chars(value)
# Apply content filters
if not _passes_filter(clean_text, filter_type, filter_value):
continue
# Create original entry
original_entry = {"text": clean_text, "codes": codes} if codes else clean_text
# Get existing translation
existing_translation = translation_memory.get(clean_text, "")
string_fields[field] = {
"original": original_entry,
"translation": existing_translation
}
return string_fields
def _passes_filter(text, filter_type, filter_value):
"""Check if text passes the specified filter."""
if filter_type is None:
return True
elif filter_type == 'is_ascii':
return bool(STRICT_ASCII_FILTER_PATTERN.match(text))
elif filter_type == 'contains_text':
return filter_value in text
return True
def patch_database(input_file):
"""Apply translations from JSON file to the database.
Args:
input_file (str): Path to JSON file containing translations
"""
if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE):
return
print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---")
# Confirm operation
response = input("Are you sure? A backup will be created. (yes/no): ").lower()
if response not in ['yes', 'y']:
print("Operation cancelled.")
return
# Create backup
print(f"Creating backup '{DB_BACKUP_FILE}'...")
shutil.copyfile(DB_FILE, DB_BACKUP_FILE)
# Load data
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = {v['table_name']: v for v in json.load(f).values()}
with open(input_file, 'r', encoding='utf-8') as f:
translations = json.load(f)
# Analyze translation changes
changes_to_apply = _analyze_translation_changes(translations)
if not changes_to_apply:
print("No changes found to apply.")
return
print(f"Found {len(changes_to_apply)} rows to update.")
# Apply changes to database
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
try:
updated_count = _apply_database_changes(cursor, repo_map, changes_to_apply)
conn.commit()
print(f"\nSuccess! Updated {updated_count} database entries.")
except Exception as e:
conn.rollback()
print(f"ERROR during patching: {e}")
print("Database rolled back to original state.")
finally:
conn.close()
def _analyze_translation_changes(translations):
"""Analyze translation JSON and extract changes to apply.
Args:
translations (dict): Translation data from JSON file
Returns:
list: List of changes to apply, each containing table, row_id, and fields
"""
changes_to_apply = []
for table_name, table_data in translations.items():
for row_id_str, fields in table_data.items():
changed_fields = {}
for field, content in fields.items():
# Check if field has translation that differs from original
if (isinstance(content, dict) and 'original' in content and
content.get('translation') and
content['translation'] != (content['original'] if isinstance(content['original'], str)
else content['original'].get('text', ''))):
# Decode special characters
original_struct = content['original']
codes = original_struct.get('codes', []) if isinstance(original_struct, dict) else []
final_text = decode_special_chars(content['translation'], codes)
changed_fields[field] = final_text
if changed_fields:
changes_to_apply.append({
'table': table_name,
'row_id': int(row_id_str),
'fields': changed_fields
})
return changes_to_apply
def _apply_database_changes(cursor, repo_map, changes_to_apply):
"""Apply translation changes to database.
Args:
cursor: SQLite cursor
repo_map (dict): Repository mapping information
changes_to_apply (list): List of changes to apply
Returns:
int: Number of successfully updated entries
"""
updated_count = 0
skipped_tables = set()
for change in tqdm(changes_to_apply, desc="Applying changes"):
table_name = change['table']
row_id = change['row_id']
fields = change['fields']
# Skip if table not in repository map
if table_name not in repo_map:
if table_name not in skipped_tables:
print(f"\nWARNING: Table '{table_name}' not found in repository map. Skipping...")
skipped_tables.add(table_name)
continue
try:
repo_info = repo_map[table_name]
# Get schema class
module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class'])
if not module_path:
if table_name not in skipped_tables:
print(f"\nWARNING: Schema class '{repo_info['blob_schema_class']}' not found. Skipping table '{table_name}'...")
skipped_tables.add(table_name)
continue
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, repo_info['blob_schema_class'])
get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}")
# Check if table exists in database
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
if not cursor.fetchone():
if table_name not in skipped_tables:
print(f"\nWARNING: Table '{table_name}' does not exist in database. Skipping...")
skipped_tables.add(table_name)
continue
# Get and update data
cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,))
result = cursor.fetchone()
if not result or not result[0]:
continue
# Parse FlatBuffer data
fbs_obj = get_root_method(result[0], 0)
data_dict = flatbuffer_to_dict(fbs_obj)
data_dict.update(fields)
# Rebuild and save
builder = flatbuffers.Builder(1024)
new_offset = dict_to_flatbuffer(builder, data_dict, schema_class)
builder.Finish(new_offset)
cursor.execute(
f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?',
(bytes(builder.Output()), row_id)
)
updated_count += 1
except sqlite3.OperationalError as e:
# Handle SQL errors (e.g., table doesn't exist)
if table_name not in skipped_tables:
print(f"\nWARNING: Database error for table '{table_name}': {e}. Skipping...")
skipped_tables.add(table_name)
continue
except Exception as e:
# Handle other errors silently or log them
continue
return updated_count
# ============================================================================
# SETUP AND UTILITY FUNCTIONS
# ============================================================================
def setup_schema_from_csharp(csharp_file='types.cs', output_fbs='generated_schema.fbs'):
"""Parse C# files and generate FlatBuffers schema.
Args:
csharp_file (str): Path to C# file with type definitions
output_fbs (str): Output .fbs schema file path
"""
if not validate_required_files(csharp_file):
return
from unidecode import unidecode
print(f"Parsing C# file: {csharp_file}")
print("This may take a while for large files...")
# Type mapping
type_map = {
'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint',
'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double',
'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte'
}
def sanitize(name):
return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name))
def to_snake_case(name):
name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name)
name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name)
return name.lower().replace('-', '_')
# Parse C# file
with open(csharp_file, 'r', encoding='utf-8') as f:
content = f.read()
# Extract namespace
ns_match = re.search(r'namespace\s+([\w.]+)', content)
namespace = ns_match.group(1) if ns_match else 'FlatData'
# Parse tables and enums
tables = {}
enums = {}
# Find all class/table definitions
table_pattern = re.compile(r'public\s+(?:sealed\s+)?class\s+(\w+)\s*{([^}]+)}', re.DOTALL)
for match in table_pattern.finditer(content):
name = match.group(1)
body = match.group(2)
# Skip non-table classes
if 'BaseExcelRepository' in body or 'BaseDBSchema' in body:
continue
fields = []
prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}')
for prop_match in prop_pattern.finditer(body):
field_type = prop_match.group(1).replace('?', '')
field_name = to_snake_case(prop_match.group(2))
# Convert type
if field_type in type_map:
fbs_type = type_map[field_type]
elif field_type.startswith('List<'):
inner = field_type[5:-1].replace('?', '')
fbs_type = f"[{type_map.get(inner, sanitize(inner))}]"
else:
fbs_type = sanitize(field_type)
fields.append((field_name, fbs_type))
if fields:
tables[name] = fields
# Find enums
enum_pattern = re.compile(r'public\s+enum\s+(\w+)\s*{([^}]+)}', re.DOTALL)
for match in enum_pattern.finditer(content):
name = match.group(1)
body = match.group(2)
values = []
for line in body.split(','):
line = line.strip().split('=')[0].strip()
if line and not line.startswith('//'):
values.append(to_snake_case(line))
if values:
enums[name] = values
# Generate .fbs file
print(f"Generating schema file: {output_fbs}")
with open(output_fbs, 'w', encoding='utf-8') as f:
f.write(f"namespace {namespace};\n\n")
# Write enums
for enum_name, values in sorted(enums.items()):
f.write(f"enum {enum_name} : int {{\n")
for value in values:
f.write(f" {value},\n")
f.write("}\n\n")
# Write tables
for table_name, fields in sorted(tables.items()):
f.write(f"table {table_name} {{\n")
for field_name, field_type in fields:
f.write(f" {field_name}:{field_type};\n")
f.write("}\n\n")
print(f"Success! Generated {len(tables)} tables and {len(enums)} enums.")
def setup_repository_mapping(csharp_file='types.cs', output_json='repository_map.json'):
"""Create repository mapping from C# file.
Args:
csharp_file (str): Path to C# file
output_json (str): Output JSON mapping file
"""
if not validate_required_files(csharp_file):
return
print(f"Analyzing '{csharp_file}' to create repository mapping...")
# Parse patterns
repo_pattern = re.compile(
r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>'
)
db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema')
prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}')
repositories = OrderedDict()
db_schemas = OrderedDict()
current_schema = None
with open(csharp_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip().split('//')[0]
# Repository definition
repo_match = repo_pattern.search(line)
if repo_match:
repo_name = repo_match.group(1)
table_class = repo_match.group(2).strip()
schema_class = repo_match.group(3).strip()
repositories[repo_name] = {
'table_class': table_class,
'schema_class': schema_class
}
continue
# DB Schema definition
schema_match = db_schema_pattern.search(line)
if schema_match:
current_schema = schema_match.group(1)
db_schemas[current_schema] = {'properties': []}
continue
# Properties
if current_schema:
prop_match = prop_pattern.search(line)
if prop_match:
prop_type = prop_match.group(1)
prop_name = prop_match.group(2)
db_schemas[current_schema]['properties'].append({
'name': prop_name,
'type': prop_type
})
# Match repositories with schemas
mapping = OrderedDict()
for repo_name, repo_info in repositories.items():
table_class = repo_info['table_class']
schema_class = repo_info['schema_class']
if schema_class in db_schemas:
mapping[repo_name] = {
'table_name': table_class.replace('Excel', ''),
'blob_schema_class': schema_class,
'properties': db_schemas[schema_class]['properties']
}
# Save mapping
with open(output_json, 'w', encoding='utf-8') as f:
json.dump(mapping, f, indent=2, ensure_ascii=False)
print(f"Success! Created mapping with {len(mapping)} repositories.")
print(f"Mapping saved to: {output_json}")
def preprocess_flatbuffer_schema(input_fbs, output_fbs=None):
"""Preprocess FlatBuffer schema to rename Python reserved keywords.
Args:
input_fbs (str): Input schema file
output_fbs (str): Output schema file (if None, modifies in place)
"""
if not validate_required_files(input_fbs):
return
reserved = [
'self', 'class', 'def', 'return', 'import', 'from', 'as',
'if', 'elif', 'else', 'while', 'for', 'in', 'is', 'not',
'and', 'or', 'True', 'False', 'None', 'pass', 'break',
'continue', 'try', 'except', 'finally', 'raise', 'with',
'yield', 'lambda', 'global', 'nonlocal'
]
with open(input_fbs, 'r', encoding='utf-8') as f:
content = f.read()
modified = False
for keyword in reserved:
pattern = rf'\b({keyword})(\s*:\s*\w+)'
if re.search(pattern, content):
content = re.sub(pattern, rf'\1_\2', content)
modified = True
print(f" Renamed '{keyword}' -> '{keyword}_'")
output_file = output_fbs or input_fbs
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
if modified:
print(f"Preprocessed schema saved to: {output_file}")
else:
print("No reserved keywords found in schema.")
def generate_flatbuffer_python(fbs_file, flatc_exe='flatc.exe', output_dir='.'):
"""Generate Python modules from FlatBuffer schema.
Args:
fbs_file (str): FlatBuffer schema file (.fbs)
flatc_exe (str): Path to flatc compiler
output_dir (str): Output directory for generated Python files
"""
if not validate_required_files(fbs_file, flatc_exe):
return
print(f"Generating Python modules from: {fbs_file}")
# Run flatc compiler
cmd = [
flatc_exe,
'--python',
'--gen-object-api',
'-o', output_dir,
fbs_file
]
result = os.system(' '.join(cmd))
if result == 0:
print("Success! Python modules generated.")
else:
print(f"ERROR: flatc failed with code {result}")
def fix_flatbuffer_reserved_names(directory='MX'):
"""Fix Python reserved keywords in generated FlatBuffer files.
Args:
directory (str): Directory containing generated Python files
"""
from pathlib import Path
if not os.path.exists(directory):
print(f"ERROR: Directory '{directory}' not found")
return
print(f"Scanning {directory} for reserved keyword issues...")
reserved_map = {'self': 'self_', 'class': 'class_', 'import': 'import_'}
fixed_count = 0
for py_file in Path(directory).rglob('*.py'):
try:
with open(py_file, 'r', encoding='utf-8') as f:
content = f.read()
original = content
for reserved, new_name in reserved_map.items():
# Fix parameter names
pattern = rf'(def __init__\([^)]*\n\s+self,\n(?:[^)]*\n)*?\s+){reserved}(\s*=)'
if re.search(pattern, content):
content = re.sub(pattern, rf'\1{new_name}\2', content)
content = content.replace(f'self.{reserved} = {reserved}', f'self.{new_name} = {new_name}')
print(f" Fixed: {py_file.name}")
if content != original:
with open(py_file, 'w', encoding='utf-8') as f:
f.write(content)
fixed_count += 1
except Exception as e:
print(f" ERROR in {py_file}: {e}")
print(f"\nFixed {fixed_count} file(s).")
# ============================================================================
# CSV EXPORT/IMPORT FUNCTIONS
# ============================================================================
def export_to_csv(json_file, csv_file):
"""Export JSON translation file to CSV format for translators."""
if not validate_required_files(json_file):
return
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Exporting translations to '{csv_file}'...")
# Process all records
all_rows = []
text_groups = {}
group_id = 1
for table_name, table_data in data.items():
for row_id, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
codes = [] if isinstance(original, str) else original.get('codes', [])
if text not in text_groups:
text_groups[text] = group_id
group_id += 1
all_rows.append([
text_groups[text], text, content.get('translation', ''),
table_name, row_id, field, 'Y' if codes else '',
json.dumps(codes) if codes else ''
])
# Write CSV files
_write_csv_files(csv_file, all_rows, text_groups)
print(f"Export completed! Unique texts: {len(text_groups)}")
def _write_csv_files(csv_file, all_rows, text_groups):
"""Write main CSV and simplified translator version."""
# Main CSV
with open(csv_file, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'SQLTable', 'RowID', 'Field', 'HasCodes', 'Codes'])
writer.writerows(sorted(all_rows, key=lambda x: x[0]))
# Simplified translator CSV
translator_csv = csv_file.replace('.csv', '_for_translators.csv')
unique_texts = {}
for row in all_rows:
text = row[1]
if text not in unique_texts:
unique_texts[text] = [text_groups[text], text, row[2], set()]
unique_texts[text][3].add(row[3])
with open(translator_csv, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'Tables'])
for text, info in sorted(unique_texts.items(), key=lambda x: x[1][0]):
writer.writerow([info[0], info[1], info[2], '|'.join(sorted(info[3]))])
print(f"Translator version: {translator_csv}")
def import_from_csv(csv_file, json_file, original_json_file=None):
"""Import translations from CSV file."""
if not original_json_file:
original_json_file = json_file
if not validate_required_files(csv_file, original_json_file):
return
# Load data
with open(original_json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load translations
is_simple = '_for_translators' in csv_file
translations = {}
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3 and row[2]: # Has translation
if is_simple:
translations[row[1]] = row[2] # original -> translation
elif len(row) >= 6:
key = f"{row[3]}:{row[4]}:{row[5]}" # table:row:field
codes = json.loads(row[7]) if len(row) > 7 and row[7] else []
translations[key] = {'original': row[1], 'translation': row[2], 'codes': codes}
# Apply translations
updated_count = 0
for table_name, table_data in data.items():
for row_id_str, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
new_translation = None
if is_simple and text in translations:
new_translation = translations[text]
elif not is_simple:
key = f"{table_name}:{row_id_str}:{field}"
if key in translations and translations[key]['original'] == text:
new_translation = translations[key]['translation']
if new_translation and new_translation != content.get('translation', ''):
content['translation'] = new_translation
updated_count += 1
# Save result
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Success! Updated {updated_count} translations.")
def validate_csv(csv_file):
"""Check CSV file for consistency and issues."""
if not validate_required_files(csv_file):
return
print(f"Validating '{csv_file}'...")
groups = {}
total_rows = 0
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3:
total_rows += 1
group_id, original, translation = row[0], row[1], row[2]
if group_id not in groups:
groups[group_id] = {'original': original, 'translations': set()}
if translation:
groups[group_id]['translations'].add(translation)
# Find issues
issues = [issue for group_id, info in groups.items()
if len(info['translations']) > 1
for issue in [{'group_id': group_id, 'original': info['original'],
'translations': list(info['translations'])}]]
translated_groups = sum(1 for info in groups.values() if info['translations'])
# Report results
print(f"\n=== VALIDATION RESULTS ===")
print(f"Total rows: {total_rows}")
print(f"Unique groups: {len(groups)}")
print(f"Translated groups: {translated_groups}")
print(f"Untranslated groups: {len(groups) - translated_groups}")
if issues:
print(f"\n=== FOUND ISSUES: {len(issues)} ===")
for issue in issues[:5]:
print(f"\nGroup {issue['group_id']}: {issue['original'][:50]}...")
print("Different translations:")
for trans in issue['translations']:
print(f" - {trans}")
if len(issues) > 5:
print(f"\n... and {len(issues) - 5} more issues")
# Save detailed report
report_file = csv_file.replace('.csv', '_issues.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write("=== ISSUE REPORT ===\n\n")
for issue in issues:
f.write(f"Group {issue['group_id']}: {issue['original']}\n")
f.write("Different translations:\n")
for trans in issue['translations']:
f.write(f" - {trans}\n")
f.write("\n")
print(f"\nDetailed report saved to: {report_file}")
else:
print("\nNo issues found!")
def main():
"""Main CLI entry point for the BA-translator tool.
Provides commands for extracting, translating, and patching game strings
using CSV workflow for translators.
"""
parser = argparse.ArgumentParser(
description="Game localization tool for Blue Archive."
)
subparsers = parser.add_subparsers(
dest='command',
required=True,
help='Available commands'
)
# Extract command - extract strings from database to JSON
parser_extract = subparsers.add_parser(
'extract',
help='Extract translatable strings from database to JSON file.'
)
parser_extract.add_argument(
'--filter',
type=str,
help='Filter for extraction. Formats: is_ascii, table_name:TableName, contains_text:Word'
)
parser_extract.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file name (default: {DEFAULT_JSON_FILE})'
)
parser_extract.add_argument(
'--update-from',
type=str,
help='Path to existing JSON file to merge translations from.'
)
# Patch command - apply translations to database
parser_patch = subparsers.add_parser(
'patch',
help='Apply translations from JSON file to the database.'
)
parser_patch.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file name (default: {DEFAULT_JSON_FILE})'
)
# CSV export command - convert JSON to CSV for translators
parser_export_csv = subparsers.add_parser(
'export_csv',
help='Export JSON translations to CSV format for translators.'
)
parser_export_csv.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_export_csv.add_argument(
'--output',
type=str,
default='translations.csv',
help='Output CSV file (default: translations.csv)'
)
# CSV import command - convert CSV back to JSON
parser_import_csv = subparsers.add_parser(
'import_csv',
help='Import translations from CSV back to JSON format.'
)
parser_import_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='Input CSV file (default: translations.csv)'
)
parser_import_csv.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_import_csv.add_argument(
'--original',
type=str,
help='Original JSON file for structure reference (if different from output)'
)
# CSV validation command - check CSV for issues
parser_validate_csv = subparsers.add_parser(
'validate_csv',
help='Validate CSV file for consistency and translation issues.'
)
parser_validate_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='CSV file to validate (default: translations.csv)'
)
# Setup schema command - generate FlatBuffer schema from C#
parser_setup_schema = subparsers.add_parser(
'setup_schema',
help='Parse C# files and generate FlatBuffer schema (.fbs file).'
)
parser_setup_schema.add_argument(
'--csharp',
type=str,
default='types.cs',
help='Input C# file with type definitions (default: types.cs)'
)
parser_setup_schema.add_argument(
'--output',
type=str,
default='generated_schema.fbs',
help='Output .fbs schema file (default: generated_schema.fbs)'
)
# Setup mapping command - create repository mapping
parser_setup_mapping = subparsers.add_parser(
'setup_mapping',
help='Create repository mapping from C# files.'
)
parser_setup_mapping.add_argument(
'--csharp',
type=str,
default='types.cs',
help='Input C# file (default: types.cs)'
)
parser_setup_mapping.add_argument(
'--output',
type=str,
default='repository_map.json',
help='Output mapping JSON file (default: repository_map.json)'
)
# Generate FlatBuffers command - generate Python modules
parser_gen_fb = subparsers.add_parser(
'generate_flatbuffers',
help='Generate Python modules from FlatBuffer schema with preprocessing.'
)
parser_gen_fb.add_argument(
'--schema',
type=str,
default='generated_schema.fbs',
help='Input .fbs schema file (default: generated_schema.fbs)'
)
parser_gen_fb.add_argument(
'--flatc',
type=str,
default='flatc.exe',
help='Path to flatc compiler (default: flatc.exe)'
)
parser_gen_fb.add_argument(
'--no-preprocess',
action='store_true',
help='Skip preprocessing (fixing reserved keywords)'
)
# Fix reserved names command - fix generated Python files
parser_fix_names = subparsers.add_parser(
'fix_reserved_names',
help='Fix Python reserved keywords in generated FlatBuffer files.'
)
parser_fix_names.add_argument(
'--directory',
type=str,
default='MX',
help='Directory with generated Python files (default: MX)'
)
# Parse arguments and execute appropriate command
args = parser.parse_args()
try:
if args.command == 'extract':
extract_strings(args.output, args.filter, args.update_from)
elif args.command == 'patch':
patch_database(args.input)
elif args.command == 'export_csv':
export_to_csv(args.input, args.output)
elif args.command == 'import_csv':
import_from_csv(args.input, args.output, args.original)
elif args.command == 'validate_csv':
validate_csv(args.input)
elif args.command == 'setup_schema':
setup_schema_from_csharp(args.csharp, args.output)
elif args.command == 'setup_mapping':
setup_repository_mapping(args.csharp, args.output)
elif args.command == 'generate_flatbuffers':
if not args.no_preprocess:
print("Preprocessing schema to fix reserved keywords...")
preprocess_flatbuffer_schema(args.schema)
generate_flatbuffer_python(args.schema, args.flatc)
elif args.command == 'fix_reserved_names':
fix_flatbuffer_reserved_names(args.directory)
else:
print(f"ERROR: Unknown command '{args.command}'")
parser.print_help()
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
except Exception as e:
print(f"ERROR: {str(e)}")
return 1
return 0
if __name__ == "__main__":
main()