5.7 KiB
Quick tutorial
Summary: We're gonna build a todo app to demonstrate how the m4g
command line tool works.
TODO: Supplement this section.
Initialize the project
Run m4g init todo-example
to initialize a new project.
Open the newly created project by running code todo-example
, make sure you have vscode
on you machine before running
this command. If you prefer using another editor, then manually open the generated folder.
Setup the database
Start the database
m4g db run
Create the entities
m4g db generate entity Tag -c "description:str:String(255)"
m4g db generate entity Todo -c "description:str:String(255)" -c "is_done:bool:Boolean()"
Define a many to many relation between Todo & Tag
m4g db link many Todo --to-many Tag
Generate the migration
m4g db generate migration initial_migration
Apply the migration
m4g db migrate
If you inspect the database in PhpMyAdmin, you should now see a populated schema.
To stop the database, you can run m4g db stop
Setup the API
Generate CRUD for Tag & Todo
m4g api generate crud Tag
m4g api generate crud Todo
Modify the generated Todo request classes
Modify CreateTodoRequest
in api/requests/create_todo_request.py
, you might need to import List
from typing
# Before
class CreateTodoRequest(BaseModel):
description: str = None
is_done: bool = None
# After
class CreateTodoRequest(BaseModel):
description: str = None
is_done: bool = None
tag_ids: Optional[List[int]] = []
Modify UpdateTodoRequest
in api/requests/update_todo_request.py
, you might need to import List
from typing
# Before
class UpdateTodoRequest(BaseModel):
description: Optional[str] = None
is_done: Optional[bool] = None
tag_ids: Optional[List[int]] = []
# After
class UpdateTodoRequest(BaseModel):
description: Optional[str] = None
is_done: Optional[bool] = None
tag_ids: Optional[List[int]] = []
Modify generated TodoService
Add the following imports in api/services/todo_service.py
.
from sqlalchemy.orm import selectinload
from db.entities.tag import Tag
Modify TodoService.list
# Before
async with async_session() as session:
stmt = select(Todo)
results = (await session.scalars(stmt)).all()
return results
# After
async with async_session() as session:
stmt = select(Todo).options(selectinload(Todo.tags))
results = (await session.scalars(stmt)).all()
return results
Modify TodoService.get_by_id
# Before
async def get_by_id(self, id: int) -> Optional[Todo]:
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id)
result = (await session.scalars(stmt)).first()
return result
# After
async def get_by_id(self, id: int) -> Optional[Todo]:
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
result = (await session.scalars(stmt)).first()
return result
Modify TodoService.create
# Before
async def create(self, data: Dict[str, Any]) -> None:
async with async_session() as session:
entity = Todo(**data)
session.add(entity)
await session.commit()
# After
async def create(self, data: Dict[str, Any]) -> None:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
entity = Todo(**data)
if len(tag_ids) > 0:
stmt = select(Tag).where(Tag.id.in_(tag_ids))
result = (await session.scalars(stmt)).all()
entity.tags = list(result)
session.add(entity)
await session.commit()
Modify TodoService.update
# Before
async def update(self, id: int, data: Dict[str, Any]) -> bool:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id)
entity = (await session.scalars(stmt)).first()
if entity is None:
return False
else:
for key, value in data.items():
setattr(entity, key, value)
await session.commit()
return True
# After
async def update(self, id: int, data: Dict[str, Any]) -> bool:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
entity = (await session.scalars(stmt)).first()
if entity is None:
return False
else:
for key, value in data.items():
setattr(entity, key, value)
if len(tag_ids) > 0:
stmt = select(Tag).where(Tag.id.in_(tag_ids))
result = (await session.scalars(stmt)).all()
entity.tags = list(result)
else:
entity.tags = []
await session.commit()
return True