From 78be2943520a91da441438d3d96c1c4a82db7880 Mon Sep 17 00:00:00 2001 From: Imp0ssibl33z Date: Sat, 11 Oct 2025 13:25:39 +0300 Subject: [PATCH] Fixes --- BAtranslator.py | 513 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 383 insertions(+), 130 deletions(-) diff --git a/BAtranslator.py b/BAtranslator.py index 5b44881..d150479 100644 --- a/BAtranslator.py +++ b/BAtranslator.py @@ -640,7 +640,7 @@ def _apply_database_changes(cursor, repo_map, changes_to_apply): # ============================================================================ def setup_schema_from_csharp(csharp_file='types.cs', output_fbs='generated_schema.fbs'): - """Parse C# files and generate FlatBuffers schema. + """Parse C# files and generate FlatBuffers schema using improved parser. Args: csharp_file (str): Path to C# file with type definitions @@ -650,179 +650,432 @@ def setup_schema_from_csharp(csharp_file='types.cs', output_fbs='generated_schem return from unidecode import unidecode + from collections import defaultdict print(f"Parsing C# file: {csharp_file}") print("This may take a while for large files...") - # Type mapping - type_map = { + # Configuration + DEFAULT_NAMESPACE = 'FlatData' + + # Type mapping from C# to FlatBuffers + CSHARP_TO_FBS_TYPE_MAP = { 'long': 'long', 'ulong': 'ulong', 'int': 'int', 'uint': 'uint', 'short': 'short', 'ushort': 'ushort', 'float': 'float', 'double': 'double', 'bool': 'bool', 'string': 'string', 'byte': 'ubyte', 'sbyte': 'byte' } - def sanitize(name): + def sanitize_identifier(name): + """Clean identifier names for FlatBuffers compatibility.""" return re.sub(r'[^A-Za-z0-9_.]', '_', unidecode(name)) - def to_snake_case(name): + def pascal_to_snake_case(name): + """Convert PascalCase to snake_case.""" name = re.sub(r'([A-Z]+)([A-Z][a-z])', r'\1_\2', name) name = re.sub(r'([a-z\d])([A-Z])', r'\1_\2', name) - return name.lower().replace('-', '_') + name = name.replace('-', '_') + return name.lower() + + def csharp_to_fbs_type(csharp_type): + """Convert C# type to FlatBuffers type.""" + if csharp_type is None: + return 'int' + + # Remove nullable indicators + csharp_type = csharp_type.replace('?', '') + + # Check direct mappings + if csharp_type in CSHARP_TO_FBS_TYPE_MAP: + return CSHARP_TO_FBS_TYPE_MAP[csharp_type] + + # Handle custom types + return sanitize_identifier(csharp_type) + + def find_full_type_name(base_type, current_ns, all_defs): + """Find the full qualified name for a type reference.""" + # Try current namespace first + if f"{current_ns}.{base_type}" in all_defs: + return f"{current_ns}.{base_type}" + + # Try default namespace + if f"{DEFAULT_NAMESPACE}.{base_type}" in all_defs: + return f"{DEFAULT_NAMESPACE}.{base_type}" + + # Try global scope + if base_type in all_defs: + return base_type + + # Search in all namespaces + for name in all_defs: + if name.endswith(f".{base_type}"): + return name + + return None + + # Parse C# file and extract table and enum definitions + all_definitions = {} - # Parse C# file with open(csharp_file, 'r', encoding='utf-8') as f: - content = f.read() - - # Extract namespace - ns_match = re.search(r'namespace\s+([\w.]+)', content) - namespace = ns_match.group(1) if ns_match else 'FlatData' - - # Parse tables and enums - tables = {} - enums = {} - - # Find all class/table definitions - table_pattern = re.compile(r'public\s+(?:sealed\s+)?class\s+(\w+)\s*{([^}]+)}', re.DOTALL) - for match in table_pattern.finditer(content): - name = match.group(1) - body = match.group(2) + current_namespace = "_GLOBAL_" + in_block = None + current_name = None + current_fields = [] + current_enum_base_type = 'int' + seen_enum_values = set() - # Skip non-table classes - if 'BaseExcelRepository' in body or 'BaseDBSchema' in body: + print("Parsing C# file line by line...") + line_count = 0 + + for line in f: + line_count += 1 + if line_count % 100000 == 0: + print(f" Processed {line_count:,} lines...") + + line = line.strip() + + # Parse namespace declarations + ns_match = re.match(r'namespace (\S+)', line) + if ns_match: + current_namespace = ns_match.group(1).replace(';', '') + continue + + # End of block + if line.startswith('}') and in_block: + full_name = f"{current_namespace}.{current_name}" + if in_block == 'table': + all_definitions[full_name] = { + 'type': 'table', + 'ns': current_namespace, + 'name': current_name, + 'fields': current_fields + } + elif in_block == 'enum': + all_definitions[full_name] = { + 'type': 'enum', + 'ns': current_namespace, + 'name': current_name, + 'base': current_enum_base_type, + 'fields': current_fields + } + + in_block = None + current_fields = [] + continue + + # Parse struct/table definitions + table_match = re.search(r'public struct (\w+) : IFlatbufferObject', line) + enum_match = re.search(r'public enum (\w+)(?:\s*:\s*(\w+))?', line) + + if table_match: + in_block = 'table' + current_name = sanitize_identifier(table_match.group(1)) + continue + elif enum_match: + in_block = 'enum' + current_name = sanitize_identifier(enum_match.group(1)) + csharp_base_type = enum_match.group(2) + current_enum_base_type = csharp_to_fbs_type(csharp_base_type) + seen_enum_values.clear() + continue + + if not in_block: + continue + + # Parse enum fields + if in_block == 'enum': + field_match = re.match(r'(\w+)\s*=\s*(-?\d+)', line) + if field_match: + field_name = sanitize_identifier(field_match.group(1)) + field_value = int(field_match.group(2)) + + if field_value not in seen_enum_values: + seen_enum_values.add(field_value) + current_fields.append(f'{field_name} = {field_value}') + continue + + # Parse table fields + if in_block == 'table': + if not line.startswith('public'): + continue + + # Parse vector methods + vec_match = re.search( + r'public\s+(?:[^\s<]+<(\S+)>|(\S+))\s+(\w+)\s*\(int\s+\w+\)', + line + ) + if vec_match: + csharp_type = vec_match.group(1) if vec_match.group(1) else vec_match.group(2) + fbs_type = csharp_to_fbs_type(csharp_type) + current_fields.append({ + 'original': sanitize_identifier(vec_match.group(3)), + 'type': f'[{fbs_type}]' + }) + continue + + # Parse property definitions + prop_match = re.search( + r'public\s+(?:Nullable<(\S+)>|ArraySegment|(\S+))\s+(\w+)\s*{', + line + ) + if prop_match: + nullable_type, full_type, csharp_name = prop_match.groups() + csharp_type = nullable_type if nullable_type else full_type + + # Skip internal FlatBuffers fields + if csharp_name == 'ByteBuffer' or csharp_name.endswith('Length'): + continue + + # Determine field type + if csharp_type == 'ArraySegment': + field_type = '[ubyte]' + else: + field_type = csharp_to_fbs_type(csharp_type) + + current_fields.append({ + 'original': sanitize_identifier(csharp_name), + 'type': field_type + }) + continue + + print(f"Parsed {len(all_definitions)} definitions from {line_count:,} lines") + + # Handle global namespace + if "_GLOBAL_" in {d['ns'] for d in all_definitions.values()}: + for name, data in list(all_definitions.items()): + if data['ns'] == "_GLOBAL_": + new_name = f"{DEFAULT_NAMESPACE}.{data['name']}" + all_definitions[new_name] = data + data['ns'] = DEFAULT_NAMESPACE + del all_definitions[name] + + # Filter and resolve dependencies + print("Resolving dependencies...") + root_types = {name for name, data in all_definitions.items() if data['type'] == 'table'} + used_types = set() + queue = list(root_types) + + while queue: + type_name = queue.pop(0) + if type_name in used_types or type_name not in all_definitions: continue - - fields = [] - prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') - for prop_match in prop_pattern.finditer(body): - field_type = prop_match.group(1).replace('?', '') - field_name = to_snake_case(prop_match.group(2)) - # Convert type - if field_type in type_map: - fbs_type = type_map[field_type] - elif field_type.startswith('List<'): - inner = field_type[5:-1].replace('?', '') - fbs_type = f"[{type_map.get(inner, sanitize(inner))}]" - else: - fbs_type = sanitize(field_type) - - fields.append((field_name, fbs_type)) + used_types.add(type_name) + data = all_definitions[type_name] - if fields: - tables[name] = fields + if data['type'] == 'table': + for field in data['fields']: + base_type = field['type'].strip('[]') + found_dep = find_full_type_name(base_type, data['ns'], all_definitions) + if found_dep and found_dep not in used_types: + queue.append(found_dep) - # Find enums - enum_pattern = re.compile(r'public\s+enum\s+(\w+)\s*{([^}]+)}', re.DOTALL) - for match in enum_pattern.finditer(content): - name = match.group(1) - body = match.group(2) - values = [] - - for line in body.split(','): - line = line.strip().split('=')[0].strip() - if line and not line.startswith('//'): - values.append(to_snake_case(line)) - - if values: - enums[name] = values + final_definitions = {name: data for name, data in all_definitions.items() if name in used_types} - # Generate .fbs file + # Separate tables and enums + tables = {name: data for name, data in final_definitions.items() if data['type'] == 'table'} + enums = {name: data for name, data in final_definitions.items() if data['type'] == 'enum'} + + print(f"Final schema: {len(tables)} tables, {len(enums)} enums") + + # Generate FlatBuffers schema file print(f"Generating schema file: {output_fbs}") with open(output_fbs, 'w', encoding='utf-8') as f: - f.write(f"namespace {namespace};\n\n") + f.write('// Auto-generated FlatBuffers schema\n') + f.write('// Field order is preserved. Key attributes are properly handled.\n\n') - # Write enums - for enum_name, values in sorted(enums.items()): - f.write(f"enum {enum_name} : int {{\n") - for value in values: - f.write(f" {value},\n") - f.write("}\n\n") + # Group by namespace + defs_by_ns = defaultdict(lambda: {'enums': [], 'tables': []}) - # Write tables - for table_name, fields in sorted(tables.items()): - f.write(f"table {table_name} {{\n") - for field_name, field_type in fields: - f.write(f" {field_name}:{field_type};\n") - f.write("}\n\n") + for name, data in enums.items(): + defs_by_ns[data['ns']]['enums'].append(data) + + for name, data in tables.items(): + defs_by_ns[data['ns']]['tables'].append(data) + + for ns, data in sorted(defs_by_ns.items()): + f.write(f'// ----- NAMESPACE: {ns} -----\n') + f.write(f'namespace {ns};\n\n') + + # Enums + if data['enums']: + f.write('// --- Enums ---\n') + for definition in sorted(data['enums'], key=lambda x: x['name']): + f.write(f'enum {definition["name"]} : {definition["base"]} {{\n') + for field in definition['fields']: + f.write(f' {field},\n') + f.write('}\n\n') + + # Tables + if data['tables']: + f.write('// --- Tables ---\n') + for definition in data['tables']: + f.write(f'table {definition["name"]} {{\n') + + # Handle field naming conflicts + snake_to_original = defaultdict(list) + for field in definition['fields']: + snake_to_original[pascal_to_snake_case(field['original'])].append(field['original']) + + # Track if key attribute was added + key_field_added = False + + for field in definition['fields']: + snake_name = pascal_to_snake_case(field['original']) + field_name = (field['original'] if len(snake_to_original[snake_name]) > 1 + else snake_name) + + is_array = field['type'].startswith('[') + base_type = field['type'].strip('[]') + final_type_str = field['type'] + + # Resolve type references + full_dep_name = find_full_type_name(base_type, definition['ns'], final_definitions) + if full_dep_name: + dep_data = final_definitions[full_dep_name] + simple_name = dep_data['name'] + + if dep_data['ns'] != definition['ns']: + final_type_str = f"{dep_data['ns']}.{simple_name}" + else: + final_type_str = simple_name + + if is_array: + final_type_str = f"[{final_type_str}]" + + # Add key attribute for primary key fields + key_suffix = "" + if (not key_field_added and + field_name.lower() in ['key', 'id'] and + not is_array): + key_suffix = " (key)" + key_field_added = True + + f.write(f' {field_name}:{final_type_str}{key_suffix};\n') + + f.write('}\n\n') print(f"Success! Generated {len(tables)} tables and {len(enums)} enums.") def setup_repository_mapping(csharp_file='types.cs', output_json='repository_map.json'): - """Create repository mapping from C# file. + """Parse C# file to extract repository and database schema information. + + This function creates a mapping file that connects repositories to their corresponding + database tables and schema classes. Based on the proven logic from dumpdbschema.py. Args: - csharp_file (str): Path to C# file - output_json (str): Output JSON mapping file + csharp_file (str): Path to C# file with type definitions + output_json (str): Output JSON mapping file path """ if not validate_required_files(csharp_file): return print(f"Analyzing '{csharp_file}' to create repository mapping...") - # Parse patterns - repo_pattern = re.compile( - r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>' - ) - db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema') - prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*{\s*get;\s*set;\s*}') - - repositories = OrderedDict() - db_schemas = OrderedDict() - current_schema = None - + # Read the entire file for processing + print("Reading large C# file for repository mapping...") with open(csharp_file, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip().split('//')[0] - - # Repository definition - repo_match = repo_pattern.search(line) - if repo_match: - repo_name = repo_match.group(1) - table_class = repo_match.group(2).strip() - schema_class = repo_match.group(3).strip() - repositories[repo_name] = { - 'table_class': table_class, - 'schema_class': schema_class - } - continue - - # DB Schema definition - schema_match = db_schema_pattern.search(line) - if schema_match: - current_schema = schema_match.group(1) - db_schemas[current_schema] = {'properties': []} - continue - - # Properties - if current_schema: - prop_match = prop_pattern.search(line) - if prop_match: - prop_type = prop_match.group(1) - prop_name = prop_match.group(2) - db_schemas[current_schema]['properties'].append({ - 'name': prop_name, - 'type': prop_type - }) + content = f.read() - # Match repositories with schemas - mapping = OrderedDict() - for repo_name, repo_info in repositories.items(): - table_class = repo_info['table_class'] - schema_class = repo_info['schema_class'] + print(f"File content loaded: {len(content):,} characters") + + # Regular expressions for parsing - improved patterns + repo_pattern = re.compile( + r'public class (\w+)\s*:\s*BaseExcelRepository<[^,]+,\s*([^,]+),\s*([^>]+)>', + re.MULTILINE + ) + db_schema_pattern = re.compile(r'public class (\w+)\s*:\s*BaseDBSchema', re.MULTILINE) + prop_pattern = re.compile(r'public\s+([\w.<>\[\]?]+)\s+(\w+)\s*\{\s*get;\s*set;\s*\}') + + db_schemas = OrderedDict() + repositories = OrderedDict() + + print("Parsing repository definitions...") + # Find all repository definitions + repo_matches = list(repo_pattern.finditer(content)) + for match in repo_matches: + repo_name = match.group(1) + db_schema_class = match.group(2).strip() + blob_schema_class = match.group(3).strip() + repositories[repo_name] = { + 'db_schema_class': db_schema_class, + 'blob_schema_class': blob_schema_class + } + + print(f"Found {len(repositories)} repository classes") + + print("Parsing database schema definitions...") + # Find all database schema definitions + schema_matches = list(db_schema_pattern.finditer(content)) + + for match in schema_matches: + schema_name = match.group(1) - if schema_class in db_schemas: - mapping[repo_name] = { - 'table_name': table_class.replace('Excel', ''), - 'blob_schema_class': schema_class, - 'properties': db_schemas[schema_class]['properties'] - } + # Find the class body by locating the opening brace and matching closing brace + match_end = match.end() + + # Look for the opening brace after the class declaration + brace_start = content.find('{', match_end) + if brace_start == -1: + continue + + # Find matching closing brace + brace_count = 1 + pos = brace_start + 1 + brace_end = -1 + + while pos < len(content) and brace_count > 0: + if content[pos] == '{': + brace_count += 1 + elif content[pos] == '}': + brace_count -= 1 + if brace_count == 0: + brace_end = pos + break + pos += 1 + + if brace_end > brace_start: + # Extract class body + class_body = content[brace_start + 1:brace_end] + + # Parse properties in this schema + properties = [] + for prop_match in prop_pattern.finditer(class_body): + prop_type = prop_match.group(1) + prop_name = prop_match.group(2) + properties.append({ + 'name': prop_name, + 'type': prop_type + }) + + db_schemas[schema_name] = properties - # Save mapping + print(f"Found {len(db_schemas)} database schema classes") + + # Combine information into final mapping + final_map = OrderedDict() + + for repo_name, repo_data in repositories.items(): + db_schema_name = repo_data['db_schema_class'] + + # Database table name is the database schema class name + table_name = db_schema_name + + # Find key columns for this schema + key_columns = db_schemas.get(db_schema_name, []) + + final_map[repo_name] = { + 'table_name': table_name, + 'key_columns': key_columns, + 'blob_schema_class': repo_data['blob_schema_class'] + } + + print(f"Saving repository mapping to '{output_json}'...") with open(output_json, 'w', encoding='utf-8') as f: - json.dump(mapping, f, indent=2, ensure_ascii=False) + json.dump(final_map, f, indent=2, ensure_ascii=False) - print(f"Success! Created mapping with {len(mapping)} repositories.") + print(f"Success! Repository mapping created with {len(final_map)} repositories.") + print(f"You can now use '{output_json}' as the source of truth for database operations.") print(f"Mapping saved to: {output_json}")