6.0 KiB
Quick tutorial
Summary: We're gonna build a todo app to demonstrate how the m4g
command works.
TODO: Supplement this section.
Initialize the project
Run m4g init todo-example
to initialize a new project.
Open the newly created project by running code todo-example
, make sure you have vscode
on you machine before running
this command. If you prefer using another editor, then manually open the generated folder.
Setup the database
Summary: This section explains how to interact with the database. TODO: Supplement this section.
Create a docker compose file for the database
Create a file called docker-compose.yml
in the root of your project with the following content.
version: '3.8'
services:
# MariaDB for the database
todo_db:
image: 'mariadb:10.4'
container_name: 'todo_db'
restart: unless-stopped
networks:
- default
ports:
- '3306:3306'
environment:
MYSQL_ROOT_PASSWORD: 'root'
MYSQL_USER: 'root'
MYSQL_PASSWORD: 'root'
MYSQL_DATABASE: 'todo'
volumes:
- 'todo_db:/var/lib/mysql'
# PhpMyAdmin for the database UI
phpmyadmin:
image: phpmyadmin/phpmyadmin
container_name: phpmyadmin
restart: unless-stopped
ports:
- 8888:80
networks:
- default
environment:
PMA_HOST: todo_db
PMA_PORT: 3306
PMA_ARBITRARY: 1
links:
- todo_db
volumes:
todo_db:
Run docker compose up -d
to start the database containers.
You should now be able to log into the database server at http://localhost:8888, with the following credentials.
username: root, password: root
Update the database connectionstring
Replace the contents of db/settings.py
with the following.
class DbSettings:
def get_connectionstring() -> str:
connectionstring = "mysql+asyncmy://root:root@localhost:3306/todo"
return connectionstring
The connectionstring has been hardcoded for demo purposes. In a production app, you would use some kind of secret manager to retrieve it.
Create the entities
m4g db generate entity Tag -c "description:str:String(255)"
m4g db generate entity Todo -c "description:str:String(255)" -c "is_done:bool:Boolean()"
Define a many to many relation between Todo & Tag
m4g db link many Todo --to-many Tag
Generate the migration
m4g db generate migration initial_migration
Apply the migration
m4g db migrate
If you inspect the database in PhpMyAdmin, you should now see a populated schema.
Setup the API
Generate CRUD for Tag & Todo
m4g api generate crud Tag
m4g api generate crud Todo
Modify generated TodoService
Add the following import in api/services/todo_service.py
.
from sqlalchemy.orm import selectinload
Modify TodoService.list
# Before
async with async_session() as session:
stmt = select(Todo)
results = (await session.scalars(stmt)).all()
return results
# After
async with async_session() as session:
stmt = select(Todo).options(selectinload(Todo.tags))
results = (await session.scalars(stmt)).all()
return results
Modify TodoService.get_by_id
# Before
async def get_by_id(self, id: int) -> Optional[Todo]:
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id)
result = (await session.scalars(stmt)).first()
return result
# After
async def get_by_id(self, id: int) -> Optional[Todo]:
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
result = (await session.scalars(stmt)).first()
return result
Modify TodoService.create
# Before
async def create(self, data: Dict[str, Any]) -> None:
async with async_session() as session:
entity = Todo(**data)
session.add(entity)
await session.commit()
# After
async def create(self, data: Dict[str, Any]) -> None:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
entity = Todo(**data)
if len(tag_ids) > 0:
stmt = select(Tag).where(Tag.id.in_(tag_ids))
result = (await session.scalars(stmt)).all()
entity.tags = list(result)
session.add(entity)
await session.commit()
Modify TodoService.update
# Before
async def update(self, id: int, data: Dict[str, Any]) -> bool:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id)
entity = (await session.scalars(stmt)).first()
if entity is None:
return False
else:
for key, value in data.items():
setattr(entity, key, value)
await session.commit()
return True
# After
async def update(self, id: int, data: Dict[str, Any]) -> bool:
tag_ids = []
if "tag_ids" in data.keys():
tag_ids = data["tag_ids"]
del data["tag_ids"]
async with async_session() as session:
stmt = select(Todo).where(Todo.id == id).options(selectinload(Todo.tags))
entity = (await session.scalars(stmt)).first()
if entity is None:
return False
else:
for key, value in data.items():
setattr(entity, key, value)
if len(tag_ids) > 0:
stmt = select(Tag).where(Tag.id.in_(tag_ids))
result = (await session.scalars(stmt)).all()
entity.tags = list(result)
else:
entity.tags = []
await session.commit()
return True