mycroforge/docs/todo-example/README.md

5.7 KiB

Quick tutorial

Summary: We're gonna build a todo app to demonstrate how the m4g command line tool works.
TODO: Supplement this section.

Initialize the project

Run m4g init todo-example to initialize a new project.

Open the newly created project by running code todo-example, make sure you have vscode on you machine before running this command. If you prefer using another editor, then manually open the generated folder.

Setup the database

Start the database

m4g db run

Create the entities

m4g db generate entity Tag -c "description:str:String(255)"

m4g db generate entity Todo -c "description:str:String(255)" -c "is_done:bool:Boolean()"

Define a many to many relation between Todo & Tag

m4g db link many Todo --to-many Tag

Generate the migration

m4g db generate migration initial_migration

Apply the migration

m4g db migrate

If you inspect the database in PhpMyAdmin, you should now see a populated schema.

To stop the database, you can run m4g db stop

Setup the API

Generate CRUD for Tag & Todo

m4g api generate crud Tag

m4g api generate crud Todo

Modify the generated Todo request classes

Modify CreateTodoRequest in api/requests/create_todo_request.py, you might need to import List from typing

# Before
class CreateTodoRequest(BaseModel):
    description: str = None
    is_done: bool = None
    
# After
class CreateTodoRequest(BaseModel):
    description: str = None
    is_done: bool = None
    tag_ids: Optional[List[int]] = []

Modify UpdateTodoRequest in api/requests/update_todo_request.py, you might need to import List from typing

# Before
class UpdateTodoRequest(BaseModel):
    description: Optional[str] = None
    is_done: Optional[bool] = None
    tag_ids: Optional[List[int]] = []

# After
class UpdateTodoRequest(BaseModel):
    description: Optional[str] = None
    is_done: Optional[bool] = None
    tag_ids: Optional[List[int]] = []

Modify generated TodoService

Add the following imports in api/services/todo_service.py.

from sqlalchemy.orm import selectinload
from db.entities.tag import Tag

Modify TodoService.list

# Before
    async with async_session() as session:
        stmt = select(Todo)
        results = (await session.scalars(stmt)).all()
        return results

# After
    async with async_session() as session:
        stmt = select(Todo).options(selectinload(Todo.tags))
        results = (await session.scalars(stmt)).all()
        return results

Modify TodoService.get_by_id

# Before
    async def get_by_id(self, id: int) -> Optional[Todo]:
        async with async_session() as session:
            stmt = select(Todo).where(Todo.id == id)
            result = (await session.scalars(stmt)).first()
            return result

# After
    async def get_by_id(self, id: int) -> Optional[Todo]:
        async with async_session() as session:
            stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
            result = (await session.scalars(stmt)).first()
            return result

Modify TodoService.create

# Before
    async def create(self, data: Dict[str, Any]) -> None:
        async with async_session() as session:
            entity = Todo(**data)
            session.add(entity)
            await session.commit()

# After
    async def create(self, data: Dict[str, Any]) -> None:
        tag_ids = []
    
        if "tag_ids" in data.keys():
            tag_ids = data["tag_ids"]
            del data["tag_ids"]
    
        async with async_session() as session:
            entity = Todo(**data)
    
            if len(tag_ids) > 0:
                stmt = select(Tag).where(Tag.id.in_(tag_ids))
                result = (await session.scalars(stmt)).all()
                entity.tags = list(result)
    
            session.add(entity)
            await session.commit()

Modify TodoService.update

# Before
    async def update(self, id: int, data: Dict[str, Any]) -> bool:
        tag_ids = []
    
        if "tag_ids" in data.keys():
            tag_ids = data["tag_ids"]
            del data["tag_ids"]
    
        async with async_session() as session:
            stmt = select(Todo).where(Todo.id == id)
            entity = (await session.scalars(stmt)).first()
    
            if entity is None:
                return False
            else:
                for key, value in data.items():
                    setattr(entity, key, value)
                await session.commit()
                return True

# After
    async def update(self, id: int, data: Dict[str, Any]) -> bool:
        tag_ids = []
    
        if "tag_ids" in data.keys():
            tag_ids = data["tag_ids"]
            del data["tag_ids"]
    
        async with async_session() as session:
            stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
            entity = (await session.scalars(stmt)).first()
    
            if entity is None:
                return False
            else:
                for key, value in data.items():
                    setattr(entity, key, value)
    
                if len(tag_ids) > 0:
                    stmt = select(Tag).where(Tag.id.in_(tag_ids))
                    result = (await session.scalars(stmt)).all()
                    entity.tags = list(result)
                else:
                    entity.tags = []
    
                await session.commit()
                return True