feat: Initial release - Unofficial Blue Archive translation toolkit

This commit is contained in:
Imp0ssibl33z
2025-08-31 16:50:56 +03:00
parent 45cae5a8c3
commit e373b095b1
8 changed files with 1945 additions and 1 deletions

894
BAtranslator.py Normal file
View File

@@ -0,0 +1,894 @@
#!/usr/bin/env python3
"""
BA Translator - A tool for extracting and applying translations for Blue Archive.
"""
import sys
import os
import sqlite3
import json
from collections import OrderedDict, defaultdict
import importlib
import shutil
import re
import argparse
from tqdm import tqdm
import flatbuffers
import csv
from pathlib import Path
# Add script directory to path for schema imports
script_dir = Path(__file__).parent.absolute()
if str(script_dir) not in sys.path:
sys.path.append(str(script_dir))
# Global configuration
DB_FILE = 'ExcelDB.db'
DB_BACKUP_FILE = 'ExcelDB.db.bak'
DEFAULT_JSON_FILE = 'translations.json'
REPO_MAP_FILE = 'repository_map.json'
BLOB_COLUMN = 'Bytes'
# Schema location mapping
SCHEMA_LOCATION_MAP = {}
def build_schema_map():
"""Build a mapping of schema class names to their module paths."""
for root, _, files in os.walk('.'):
for filename in files:
if filename.endswith('.py') and filename != '__init__.py':
class_name = filename[:-3]
relative_path = os.path.relpath(root, '.')
if relative_path == '.':
module_path = class_name
else:
module_path = os.path.normpath(relative_path).replace(os.sep, '.') + '.' + class_name
SCHEMA_LOCATION_MAP[class_name] = module_path
build_schema_map()
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
# Patterns for special character handling
CONTROL_CHAR_PATTERN = re.compile(r'[\x00-\x1f\x7F-\x9F]+')
STRICT_ASCII_FILTER_PATTERN = re.compile(r'^[\s\x21-\x7E{}:]*$')
def validate_required_files(*file_paths):
"""Validate that all required files exist.
Args:
*file_paths: Variable number of file paths to validate
Returns:
bool: True if all files exist, False otherwise
"""
missing_files = []
for file_path in file_paths:
if not os.path.exists(file_path):
missing_files.append(file_path)
if missing_files:
for file_path in missing_files:
print(f"ERROR: Required file '{file_path}' not found.")
return False
return True
def encode_special_chars(text):
"""Encode special control characters in text for safe JSON storage.
Args:
text (str): Input text that may contain control characters
Returns:
tuple: (clean_text, codes_list) where codes_list contains the encoded chars
"""
if not text:
return text, []
codes = CONTROL_CHAR_PATTERN.findall(text)
if not codes:
return text, []
def replacer_func(match):
index = len(replacer_func.codes)
replacer_func.codes.append(match.group(0))
return f"{{C:{index}}}"
replacer_func.codes = []
clean_text = CONTROL_CHAR_PATTERN.sub(replacer_func, text)
return clean_text, replacer_func.codes
def decode_special_chars(clean_text, codes):
"""Decode special characters back into text.
Args:
clean_text (str): Text with encoded placeholders
codes (list): List of original control characters
Returns:
str: Text with control characters restored
"""
if not clean_text or not codes:
return clean_text
placeholder_pattern = re.compile(r'{C:(\d+)}')
def replacer_func(match):
index = int(match.group(1))
if index < len(codes):
return codes[index]
return match.group(0)
return placeholder_pattern.sub(replacer_func, clean_text)
def flatbuffer_to_dict(obj):
"""Recursively convert a FlatBuffer object to a Python dictionary.
Args:
obj: FlatBuffer object to convert
Returns:
dict or primitive: Converted object
"""
if obj is None or isinstance(obj, (int, float, bool, str)):
return obj
if isinstance(obj, bytes):
return obj.decode('utf-8', 'ignore')
result = OrderedDict()
# Get all public methods that look like FlatBuffer accessors
for method_name in dir(obj):
if not method_name[0].isupper():
continue
method = getattr(obj, method_name)
if not callable(method) or method.__code__.co_argcount != 1:
continue
try:
value = method()
# Handle array-like values
if hasattr(value, 'Length') and callable(getattr(value, 'Length')):
result[method_name] = [
flatbuffer_to_dict(value(i))
for i in range(value.Length())
]
else:
result[method_name] = flatbuffer_to_dict(value)
except Exception:
# Skip methods that fail to call
continue
return result
def dict_to_flatbuffer(builder, data_dict, schema_class):
"""Build a FlatBuffer from a dictionary using the schema class.
Args:
builder: FlatBuffer builder instance
data_dict (dict): Data to serialize
schema_class: FlatBuffer schema class
Returns:
int: Offset of the created object
"""
schema_name = schema_class.__name__
schema_module = sys.modules[schema_class.__module__]
# Determine field order from the schema
field_order = []
if hasattr(schema_class, f'GetRootAs{schema_name}'):
add_methods = [
m for m in dir(schema_module)
if m.startswith(f"{schema_name}Add")
]
field_order = [
m.replace(f"{schema_name}Add", "")
for m in reversed(add_methods)
]
# Pre-create string pointers
string_pointers = {}
for key, value in data_dict.items():
if isinstance(value, str):
string_pointers[key] = builder.CreateString(value)
# Start building the object
start_method = getattr(schema_module, f"{schema_name}Start")
start_method(builder)
# Add fields in the correct order
for field_name in field_order:
if field_name in data_dict:
add_method = getattr(schema_module, f"{schema_name}Add{field_name}")
value = data_dict[field_name]
if field_name in string_pointers:
add_method(builder, string_pointers[field_name])
else:
add_method(builder, value)
# Finish building
end_method = getattr(schema_module, f"{schema_name}End")
return end_method(builder)
def create_translation_memory(old_json_data):
"""Create translation memory from existing JSON data.
Args:
old_json_data (dict): Previously translated data
Returns:
dict: Mapping of original text to translations
"""
memory = {}
print("Creating translation memory from existing data...")
for table_data in old_json_data.values():
for row_data in table_data.values():
for field_content in row_data.values():
if not isinstance(field_content, dict):
continue
if 'original' not in field_content or 'translation' not in field_content:
continue
original_struct = field_content['original']
translation_text = field_content['translation']
# Extract original text
if isinstance(original_struct, str):
original_text = original_struct
else:
original_text = original_struct.get('text')
# Store translation if it exists and differs from original
if translation_text and translation_text != original_text:
memory[original_text] = translation_text
print(f"Translation memory created with {len(memory)} unique translations.")
return memory
# ============================================================================
# MAIN FUNCTIONS
# ============================================================================
def extract_strings(output_file, filter_str=None, update_from=None):
"""Extract translatable strings from the database.
Args:
output_file (str): Path to output JSON file
filter_str (str): Optional filter (e.g., 'is_ascii', 'table_name:TableName')
update_from (str): Path to existing JSON file to merge translations from
"""
# Validate required files
if not validate_required_files(REPO_MAP_FILE, DB_FILE):
return
# Load existing translations if specified
translation_memory = {}
if update_from:
if os.path.exists(update_from):
with open(update_from, 'r', encoding='utf-8') as f:
old_data = json.load(f)
translation_memory = create_translation_memory(old_data)
else:
print(f"WARNING: Update file '{update_from}' not found.")
# Parse filter
filter_type, filter_value = _parse_filter(filter_str)
# Load repository mapping and connect to database
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = json.load(f)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
translations_dict = OrderedDict()
try:
print("Extracting translatable strings...")
for repo_info in tqdm(repo_map.values(), desc="Processing repositories"):
table_name = repo_info['table_name']
# Apply table filter
if filter_type == 'table_name' and table_name != filter_value:
continue
table_translations = _process_table(cursor, repo_info, filter_type, filter_value, translation_memory)
if table_translations:
translations_dict[table_name] = table_translations
finally:
conn.close()
if not translations_dict:
print("No strings found matching the filter.")
return
# Save results
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(translations_dict, f, indent=2, ensure_ascii=False)
print(f"\nSuccess! Translation data saved to '{output_file}'.")
def _parse_filter(filter_str):
"""Parse filter string into type and value."""
if not filter_str:
return None, None
if ':' in filter_str:
filter_type, filter_value = filter_str.split(':', 1)
else:
filter_type, filter_value = filter_str, None
print(f"Applying filter: type='{filter_type}', value='{filter_value}'")
return filter_type, filter_value
def _process_table(cursor, repo_info, filter_type, filter_value, translation_memory):
"""Process a single table and extract translatable strings."""
table_name = repo_info['table_name']
blob_schema_name = repo_info['blob_schema_class']
try:
# Load schema module
module_path = SCHEMA_LOCATION_MAP.get(blob_schema_name)
if not module_path:
return None
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, blob_schema_name)
get_root_method = getattr(schema_class, f"GetRootAs{blob_schema_name}")
# Process table rows
cursor.execute(f'SELECT rowid, "{BLOB_COLUMN}" FROM "{table_name}"')
table_translations = OrderedDict()
for row_id, blob_data in cursor.fetchall():
if not blob_data:
continue
try:
# Parse FlatBuffer and extract strings
fbs_obj = get_root_method(blob_data, 0)
data_dict = flatbuffer_to_dict(fbs_obj)
string_fields = _extract_string_fields(data_dict, filter_type, filter_value, translation_memory)
if string_fields:
table_translations[row_id] = string_fields
except Exception:
# Skip rows that can't be processed
continue
return table_translations if table_translations else None
except (ImportError, AttributeError):
# Skip tables that can't be processed
return None
def _extract_string_fields(data_dict, filter_type, filter_value, translation_memory):
"""Extract and filter string fields from FlatBuffer data."""
string_fields = OrderedDict()
for field, value in data_dict.items():
if not isinstance(value, str) or not value:
continue
clean_text, codes = encode_special_chars(value)
# Apply content filters
if not _passes_filter(clean_text, filter_type, filter_value):
continue
# Create original entry
original_entry = {"text": clean_text, "codes": codes} if codes else clean_text
# Get existing translation
existing_translation = translation_memory.get(clean_text, "")
string_fields[field] = {
"original": original_entry,
"translation": existing_translation
}
return string_fields
def _passes_filter(text, filter_type, filter_value):
"""Check if text passes the specified filter."""
if filter_type is None:
return True
elif filter_type == 'is_ascii':
return bool(STRICT_ASCII_FILTER_PATTERN.match(text))
elif filter_type == 'contains_text':
return filter_value in text
return True
def patch_database(input_file):
"""Apply translations from JSON file to the database.
Args:
input_file (str): Path to JSON file containing translations
"""
# Validate files
if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE):
return
print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---")
# Confirm operation
response = input("Are you sure? A backup will be created. (yes/no): ").lower()
if response not in ['yes', 'y']:
print("Operation cancelled.")
return
# Create backup
print(f"Creating backup '{DB_BACKUP_FILE}'...")
shutil.copyfile(DB_FILE, DB_BACKUP_FILE)
# Load data
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = {v['table_name']: v for v in json.load(f).values()}
with open(input_file, 'r', encoding='utf-8') as f:
translations = json.load(f)
# Process changes
print("Analyzing and applying translations...")
changes_to_apply = _analyze_translation_changes(translations)
if not changes_to_apply:
print("No changes found to apply.")
return
print(f"Found {len(changes_to_apply)} rows to update.")
# Apply changes to database
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
try:
updated_count = _apply_database_changes(cursor, repo_map, changes_to_apply)
conn.commit()
print(f"\nSuccess! Updated {updated_count} database entries.")
except Exception as e:
conn.rollback()
print(f"ERROR during patching: {e}")
print("Database rolled back to original state.")
finally:
conn.close()
def patch_database(input_file):
"""Apply translations from JSON file to the database."""
if not validate_required_files(REPO_MAP_FILE, input_file, DB_FILE):
return
print(f"--- PATCHING MODE: '{input_file}' -> '{DB_FILE}' ---")
# Confirm operation
response = input("Are you sure? A backup will be created. (yes/no): ").lower()
if response not in ['yes', 'y']:
print("Operation cancelled.")
return
# Create backup
print(f"Creating backup '{DB_BACKUP_FILE}'...")
shutil.copyfile(DB_FILE, DB_BACKUP_FILE)
# Load data
with open(REPO_MAP_FILE, 'r', encoding='utf-8') as f:
repo_map = {v['table_name']: v for v in json.load(f).values()}
with open(input_file, 'r', encoding='utf-8') as f:
translations = json.load(f)
# Find changes to apply
changes_to_apply = []
for table_name, table_data in translations.items():
for row_id_str, fields in table_data.items():
changed_fields = {}
for field, content in fields.items():
if (isinstance(content, dict) and 'original' in content and
content.get('translation') and
content['translation'] != (content['original'] if isinstance(content['original'], str)
else content['original'].get('text', ''))):
# Decode special characters
original_struct = content['original']
codes = original_struct.get('codes', []) if isinstance(original_struct, dict) else []
final_text = decode_special_chars(content['translation'], codes)
changed_fields[field] = final_text
if changed_fields:
changes_to_apply.append({
'table': table_name,
'row_id': int(row_id_str),
'fields': changed_fields
})
if not changes_to_apply:
print("No changes found to apply.")
return
print(f"Found {len(changes_to_apply)} records to update.")
# Apply changes
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
updated_count = 0
try:
for change in tqdm(changes_to_apply, desc="Applying changes"):
table_name = change['table']
row_id = change['row_id']
fields = change['fields']
if table_name not in repo_map:
continue
try:
repo_info = repo_map[table_name]
module_path = SCHEMA_LOCATION_MAP.get(repo_info['blob_schema_class'])
if not module_path:
continue
schema_module = importlib.import_module(module_path)
schema_class = getattr(schema_module, repo_info['blob_schema_class'])
get_root_method = getattr(schema_class, f"GetRootAs{repo_info['blob_schema_class']}")
# Get and update data
cursor.execute(f'SELECT "{BLOB_COLUMN}" FROM "{table_name}" WHERE rowid = ?', (row_id,))
result = cursor.fetchone()
if not result or not result[0]:
continue
fbs_obj = get_root_method(result[0], 0)
data_dict = flatbuffer_to_dict(fbs_obj)
data_dict.update(fields)
# Rebuild and save
builder = flatbuffers.Builder(1024)
new_offset = dict_to_flatbuffer(builder, data_dict, schema_class)
builder.Finish(new_offset)
cursor.execute(f'UPDATE "{table_name}" SET "{BLOB_COLUMN}" = ? WHERE rowid = ?',
(bytes(builder.Output()), row_id))
updated_count += 1
except Exception:
continue
conn.commit()
print(f"\nSuccess! Updated {updated_count} database entries.")
except Exception as e:
conn.rollback()
print(f"ERROR during patching: {e}")
finally:
conn.close()
# ============================================================================
# CSV EXPORT/IMPORT FUNCTIONS
# ============================================================================
def export_to_csv(json_file, csv_file):
"""Export JSON translation file to CSV format for translators."""
if not validate_required_files(json_file):
return
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Exporting translations to '{csv_file}'...")
# Process all records
all_rows = []
text_groups = {}
group_id = 1
for table_name, table_data in data.items():
for row_id, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
codes = [] if isinstance(original, str) else original.get('codes', [])
if text not in text_groups:
text_groups[text] = group_id
group_id += 1
all_rows.append([
text_groups[text], text, content.get('translation', ''),
table_name, row_id, field, 'Y' if codes else '',
json.dumps(codes) if codes else ''
])
# Write CSV files
_write_csv_files(csv_file, all_rows, text_groups)
print(f"Export completed! Unique texts: {len(text_groups)}")
def _write_csv_files(csv_file, all_rows, text_groups):
"""Write main CSV and simplified translator version."""
# Main CSV
with open(csv_file, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'SQLTable', 'RowID', 'Field', 'HasCodes', 'Codes'])
writer.writerows(sorted(all_rows, key=lambda x: x[0]))
# Simplified translator CSV
translator_csv = csv_file.replace('.csv', '_for_translators.csv')
unique_texts = {}
for row in all_rows:
text = row[1]
if text not in unique_texts:
unique_texts[text] = [text_groups[text], text, row[2], set()]
unique_texts[text][3].add(row[3])
with open(translator_csv, 'w', encoding='utf-8-sig', newline='') as f:
writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL)
writer.writerow(['GroupID', 'Original', 'Translation', 'Tables'])
for text, info in sorted(unique_texts.items(), key=lambda x: x[1][0]):
writer.writerow([info[0], info[1], info[2], '|'.join(sorted(info[3]))])
print(f"Translator version: {translator_csv}")
def import_from_csv(csv_file, json_file, original_json_file=None):
"""Import translations from CSV file."""
if not original_json_file:
original_json_file = json_file
if not validate_required_files(csv_file, original_json_file):
return
# Load data
with open(original_json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Load translations
is_simple = '_for_translators' in csv_file
translations = {}
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3 and row[2]: # Has translation
if is_simple:
translations[row[1]] = row[2] # original -> translation
elif len(row) >= 6:
key = f"{row[3]}:{row[4]}:{row[5]}" # table:row:field
codes = json.loads(row[7]) if len(row) > 7 and row[7] else []
translations[key] = {'original': row[1], 'translation': row[2], 'codes': codes}
# Apply translations
updated_count = 0
for table_name, table_data in data.items():
for row_id_str, row_data in table_data.items():
for field, content in row_data.items():
if isinstance(content, dict) and 'original' in content:
original = content['original']
text = original if isinstance(original, str) else original.get('text', '')
new_translation = None
if is_simple and text in translations:
new_translation = translations[text]
elif not is_simple:
key = f"{table_name}:{row_id_str}:{field}"
if key in translations and translations[key]['original'] == text:
new_translation = translations[key]['translation']
if new_translation and new_translation != content.get('translation', ''):
content['translation'] = new_translation
updated_count += 1
# Save result
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Success! Updated {updated_count} translations.")
def validate_csv(csv_file):
"""Check CSV file for consistency and issues."""
if not validate_required_files(csv_file):
return
print(f"Validating '{csv_file}'...")
groups = {}
total_rows = 0
with open(csv_file, 'r', encoding='utf-8-sig', newline='') as f:
reader = csv.reader(f, delimiter=';', quoting=csv.QUOTE_ALL)
next(reader) # Skip header
for row in reader:
if len(row) >= 3:
total_rows += 1
group_id, original, translation = row[0], row[1], row[2]
if group_id not in groups:
groups[group_id] = {'original': original, 'translations': set()}
if translation:
groups[group_id]['translations'].add(translation)
# Find issues
issues = [issue for group_id, info in groups.items()
if len(info['translations']) > 1
for issue in [{'group_id': group_id, 'original': info['original'],
'translations': list(info['translations'])}]]
translated_groups = sum(1 for info in groups.values() if info['translations'])
# Report results
print(f"\n=== VALIDATION RESULTS ===")
print(f"Total rows: {total_rows}")
print(f"Unique groups: {len(groups)}")
print(f"Translated groups: {translated_groups}")
print(f"Untranslated groups: {len(groups) - translated_groups}")
if issues:
print(f"\n=== FOUND ISSUES: {len(issues)} ===")
for issue in issues[:5]:
print(f"\nGroup {issue['group_id']}: {issue['original'][:50]}...")
print("Different translations:")
for trans in issue['translations']:
print(f" - {trans}")
if len(issues) > 5:
print(f"\n... and {len(issues) - 5} more issues")
# Save detailed report
report_file = csv_file.replace('.csv', '_issues.txt')
with open(report_file, 'w', encoding='utf-8') as f:
f.write("=== ISSUE REPORT ===\n\n")
for issue in issues:
f.write(f"Group {issue['group_id']}: {issue['original']}\n")
f.write("Different translations:\n")
for trans in issue['translations']:
f.write(f" - {trans}\n")
f.write("\n")
print(f"\nDetailed report saved to: {report_file}")
else:
print("\nNo issues found!")
def main():
"""Main CLI entry point for the BA-translator tool.
Provides commands for extracting, translating, and patching game strings
using CSV workflow for translators.
"""
parser = argparse.ArgumentParser(
description="Game localization tool for Blue Archive."
)
subparsers = parser.add_subparsers(
dest='command',
required=True,
help='Available commands'
)
# Extract command - extract strings from database to JSON
parser_extract = subparsers.add_parser(
'extract',
help='Extract translatable strings from database to JSON file.'
)
parser_extract.add_argument(
'--filter',
type=str,
help='Filter for extraction. Formats: is_ascii, table_name:TableName, contains_text:Word'
)
parser_extract.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file name (default: {DEFAULT_JSON_FILE})'
)
parser_extract.add_argument(
'--update-from',
type=str,
help='Path to existing JSON file to merge translations from.'
)
# Patch command - apply translations to database
parser_patch = subparsers.add_parser(
'patch',
help='Apply translations from JSON file to the database.'
)
parser_patch.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file name (default: {DEFAULT_JSON_FILE})'
)
# CSV export command - convert JSON to CSV for translators
parser_export_csv = subparsers.add_parser(
'export_csv',
help='Export JSON translations to CSV format for translators.'
)
parser_export_csv.add_argument(
'--input',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Input JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_export_csv.add_argument(
'--output',
type=str,
default='translations.csv',
help='Output CSV file (default: translations.csv)'
)
# CSV import command - convert CSV back to JSON
parser_import_csv = subparsers.add_parser(
'import_csv',
help='Import translations from CSV back to JSON format.'
)
parser_import_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='Input CSV file (default: translations.csv)'
)
parser_import_csv.add_argument(
'--output',
type=str,
default=DEFAULT_JSON_FILE,
help=f'Output JSON file (default: {DEFAULT_JSON_FILE})'
)
parser_import_csv.add_argument(
'--original',
type=str,
help='Original JSON file for structure reference (if different from output)'
)
# CSV validation command - check CSV for issues
parser_validate_csv = subparsers.add_parser(
'validate_csv',
help='Validate CSV file for consistency and translation issues.'
)
parser_validate_csv.add_argument(
'--input',
type=str,
default='translations.csv',
help='CSV file to validate (default: translations.csv)'
)
# Parse arguments and execute appropriate command
args = parser.parse_args()
try:
if args.command == 'extract':
extract_strings(args.output, args.filter, args.update_from)
elif args.command == 'patch':
patch_database(args.input)
elif args.command == 'export_csv':
export_to_csv(args.input, args.output)
elif args.command == 'import_csv':
import_from_csv(args.input, args.output, args.original)
elif args.command == 'validate_csv':
validate_csv(args.input)
else:
print(f"ERROR: Unknown command '{args.command}'")
parser.print_help()
except KeyboardInterrupt:
print("\nOperation cancelled by user.")
except Exception as e:
print(f"ERROR: {str(e)}")
return 1
return 0
if __name__ == "__main__":
main()