mycroforge/MycroForge.CLI/CodeGen/DbEnvInitializer.cs

78 lines
2.6 KiB
C#

using MycroForge.Parsing;
namespace MycroForge.CLI.CodeGen;
public class DbEnvInitializer : PythonSourceModifier
{
private PythonParser.Import_fromContext? _alembicImport;
private PythonParser.AssignmentContext? _targetMetaDataAssignment;
private PythonParser.AssignmentContext? _urlAssignmentContext;
private PythonParser.AssignmentContext? _connectableAssignmentContext;
public DbEnvInitializer(string source) : base(source)
{
}
public override string Rewrite()
{
var tree = Parser.file_input();
Visit(tree);
if (_alembicImport is null)
throw new Exception("Could not find import insertion point.");
if (_targetMetaDataAssignment is null)
throw new Exception("Could not find metadata insertion point.");
if (_urlAssignmentContext is null)
throw new Exception("Could not find url insertion point.");
if (_connectableAssignmentContext is null)
throw new Exception("Could not find connectable insertion point.");
Rewrite(_alembicImport, [
GetOriginalText(_alembicImport),
$"from {Features.Db.FeatureName}.settings import DbSettings",
$"from {Features.Db.FeatureName}.entities.entity_base import EntityBase"
]);
Rewrite(_targetMetaDataAssignment, "target_metadata = EntityBase.metadata");
Rewrite(_urlAssignmentContext, "url = DbSettings.get_connectionstring()");
const string indent = " ";
Rewrite(_connectableAssignmentContext, [
"url = DbSettings.get_connectionstring()",
$"{indent}context.config.set_main_option('sqlalchemy.url', url)",
$"{indent}{GetOriginalText(_connectableAssignmentContext)}"
]);
return Rewriter.GetText();
}
public override object? VisitImport_from(PythonParser.Import_fromContext context)
{
var text = GetOriginalText(context);
if (text == "from alembic import context")
_alembicImport = context;
return base.VisitImport_from(context);
}
public override object? VisitAssignment(PythonParser.AssignmentContext context)
{
var text = GetOriginalText(context);
if (text == "target_metadata = None")
_targetMetaDataAssignment = context;
else if (text == "url = config.get_main_option(\"sqlalchemy.url\")")
_urlAssignmentContext = context;
else if (text.StartsWith("connectable ="))
_connectableAssignmentContext = context;
return base.VisitAssignment(context);
}
}