non working MongoDB custom ODM

This commit is contained in:
Yohan Boniface 2019-03-17 19:30:10 +01:00
commit b25d8a28c8
10 changed files with 465 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
__pycache__
*.egg-info
tmp/

190
kaba/__init__.py Normal file
View file

@ -0,0 +1,190 @@
import os
from time import perf_counter
import ujson as json
import hupper
import minicli
from bson import ObjectId
from jinja2 import Environment, PackageLoader, select_autoescape
from pymongo import MongoClient
from roll import Roll, Response
from roll.extensions import cors, options, traceback, simple_server
from .base import Document, Str, Float, Array, Email, Int, Reference, Datetime, Mapping
class Response(Response):
def html(self, template_name, *args, **kwargs):
self.headers["Content-Type"] = "text/html; charset=utf-8"
context = app.context()
context.update(kwargs)
context["request"] = self.request
if self.request.cookies.get("message"):
context["message"] = json.loads(self.request.cookies["message"])
self.cookies.set("message", "")
self.body = env.get_template(template_name).render(*args, **context)
class Roll(Roll):
Response = Response
_context_func = []
def context(self):
context = {}
for func in self._context_func:
context.update(func())
return context
def register_context(self, func):
self._context_func.append(func)
env = Environment(
loader=PackageLoader("kaba", "templates"), autoescape=select_autoescape(["kaba"])
)
class Producer(Document):
__collection__ = "producers"
name = Str(required=True)
@property
def products(self):
return Product.find(producer=self._id)
class Product(Document):
__collection__ = "products"
producer = Reference(Producer, required=True)
name = Str(required=True)
ref = Str(required=True)
description = Str()
price = Float(required=True)
class Person(Document):
__collection__ = "persons"
first_name = Str()
last_name = Str()
email = Email()
class ProductOrder(Document):
ref = Str()
wanted = Int()
ordered = Int()
class PersonOrder(Document):
person = Str()
products = Array(ProductOrder)
class Order(Document):
__collection__ = "orders"
when = Datetime(required=True)
where = Str()
producer = Reference(Producer, required=True)
products = Array(Product)
orders = Mapping(Str, PersonOrder)
app = Roll()
cors(app, methods="*", headers="*")
options(app)
@app.listen("request")
async def attach_request(request, response):
response.request = request
@app.listen("startup")
async def on_startup():
connect()
@app.route("/", methods=["GET"])
async def home(request, response):
response.html("home.html", {"orders": Order.find()})
@app.route("/commande/{order_id}", methods=["GET"])
async def get_order(request, response, order_id):
order = Order.find_one(_id=ObjectId(order_id))
response.html(
"order.html",
{"order": order, "person": request.query.get("email"), "person_order": None},
)
@app.route("/commande/{order_id}", methods=["POST"])
async def place_order(request, response, order_id):
order = Order.find_one(_id=ObjectId(order_id))
email = request.query.get("email")
person_order = PersonOrder(person=email)
form = request.form
for product in order.products:
quantity = form.int(product.ref, 0)
if quantity:
person_order.products.append(ProductOrder(ref=product.ref, wanted=quantity))
if not order.orders:
order.orders = {}
order.orders[email] = person_order
order.replace_one()
response.headers["Location"] = request.url.decode()
response.status = 302
def connect():
db = os.environ.get("KABA_DB", "mongodb://localhost/kaba")
client = MongoClient(db)
db = client.get_database()
Producer.bind(db)
Product.bind(db)
Order.bind(db)
Person.bind(db)
return client
@minicli.cli()
def shell():
"""Run an ipython already connected to Mongo."""
try:
from IPython import start_ipython
except ImportError:
print('IPython is not installed. Type "pip install ipython"')
else:
start_ipython(
argv=[],
user_ns={
"Producer": Producer,
"app": app,
"Product": Product,
"Person": Person,
"Order": Order,
},
)
@minicli.wrap
def cli_wrapper():
connect()
start = perf_counter()
yield
elapsed = perf_counter() - start
print(f"Done in {elapsed:.5f} seconds.")
@minicli.cli
def serve(reload=False):
"""Run a web server (for development only)."""
if reload:
hupper.start_reloader("kaba.serve")
traceback(app)
simple_server(app, port=2244)
def main():
minicli.run()

169
kaba/base.py Normal file
View file

@ -0,0 +1,169 @@
from datetime import datetime
from bson import ObjectId
class classproperty:
def __init__(self, f):
self.f = f
def __get__(self, obj, owner):
return self.f(owner)
class DoesNotExist(ValueError):
pass
class Field:
name = None
coerce = None
def __init__(self, choices=[], required=False, default=None):
self.choices = choices
self.required = required
self.default = default
def __get__(self, obj, type=None):
if obj is None:
return self
value = obj.get(self.name)
return value
def __set__(self, obj, value):
print("set", value, id(value))
value = self.coerce(value)
print("set after", value, id(value))
obj[self.name] = value
class Str(Field):
coerce = str
class Float(Field):
coerce = float
class Int(Field):
coerce = int
class Datetime(Field):
def coerce(self, value):
if isinstance(value, datetime):
return value
if isinstance(value, int):
return datetime.fromtimestamp(value)
class Email(Field):
def coerce(self, value):
# TODO proper validation
if "@" not in value:
raise ValueError(f"Invalid value for email: {value}")
return value
class Reference(Field):
def coerce(self, value):
if isinstance(value, dict):
value = value["_id"]
return ObjectId(value)
def __init__(self, document, *args, **kwargs):
self.document = document
return super().__init__(*args, **kwargs)
class Dict(Field):
coerce = dict
class Mapping(Field):
def __init__(self, key_field, value_field, *args, **kwargs):
self.key_field = key_field
self.value_field = value_field
return super().__init__(*args, **kwargs)
def coerce(self, value):
print("coerce raw", value, id(value))
if value is None:
value = {}
if not isinstance(value, dict):
raise ValueError(f"{value} is not a dict")
print("coerce", value, id(value))
return {
self.key_field.coerce(k): self.value_field.coerce(v)
for k, v in value.items()
}
class Array(Field):
def __init__(self, type, *args, **kwargs):
self.coerce = type
return super().__init__(*args, **kwargs)
def __get__(self, obj, type=None):
if obj is None:
return self
value = obj.get(self.name)
if value is None:
value = []
self.__set__(value)
return value
def __set__(self, obj, value):
obj[self.name] = [self.coerce(v) for v in value or []]
class MetaDocument(type):
def __new__(cls, name, bases, attrs):
for attr_name, attr_value in attrs.items():
if not isinstance(attr_value, Field):
continue
attr_value.name = attr_name
return super().__new__(cls, name, bases, attrs)
class Document(dict, metaclass=MetaDocument):
__db__ = None
__collection__ = None
# def __repr__(self):
# return f"<{self.__class__.__name__} {self._id}>"
@property
def _id(self):
return self["_id"]
def insert_one(self):
self.collection.insert_one(self)
return self
def replace_one(self):
self.collection.replace_one({"_id": self._id}, self)
return self
@classmethod
def find_one(cls, **kwargs):
raw = cls.collection.find_one(kwargs)
if not raw:
raise DoesNotExist
return cls(**raw)
@classmethod
def find(cls, **kwargs):
for raw in cls.collection.find(kwargs):
yield cls(**raw)
@classproperty
def collection(cls):
assert cls.__collection__ is not None, f"You must define a {cls}.__collection__"
return cls.__db__[cls.__collection__]
@classmethod
def bind(cls, db):
cls.__db__ = db

17
kaba/templates/base.html Normal file
View file

@ -0,0 +1,17 @@
<!DOCTYPE html>
<html>
<head>
<title>{% if title %}{{ title }} - {% endif %}Commandes Epinamap</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/font-awesome/4.7.0/css/font-awesome.min.css" type="text/css">
<link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/bulma/0.4.1/css/bulma.css">
{% block head %}
{% endblock head %}
</head>
<body>
{% block body %}
{% endblock body %}
</body>
</html>

14
kaba/templates/home.html Normal file
View file

@ -0,0 +1,14 @@
{% extends "base.html" %}
{% block body %}
{% for order in orders %}
<p>Producteur: {{ order.producer }}</p>
<p>Lieu: {{ order.where }}</p>
<p>Date: {{ order.when }}</p>
<form action="/commande/{{ order._id }}">
<label for="email">Participer à la commande</label>
<input type="text" name="email">
<input type="submit" value="Commander">
</form>
{% endfor %}
{% endblock body %}

18
kaba/templates/order.html Normal file
View file

@ -0,0 +1,18 @@
{% extends "base.html" %}
{% block body %}
<p>Producteur: {{ order.producer }}</p>
<p>Lieu: {{ order.where }}</p>
<p>Date: {{ order.when }}</p>
<p>Commande pour {{ person }}</p>
<form method="post">
<table>
<tr><th>Produit</th><th>Prix</th><th>Quantité</th></tr>
{% for product in order.products %}
<tr><th>{{ product.name }}</th><td>{{ product.price }} €</td><td><input type="number" name="{{ product.ref }}"></td></tr>
{% endfor %}
</table>
<input type="hidden" name="email" value="{{ person }}">
<input type="submit" value="Valider ma commande">
</form>
{% endblock body %}

7
setup.cfg Normal file
View file

@ -0,0 +1,7 @@
[metadata]
name = kaba
version = 0.0.1
[options.entry_points]
console_scripts =
kaba = kaba:main

2
setup.py Normal file
View file

@ -0,0 +1,2 @@
from setuptools import setup
setup()

28
tests/conftest.py Normal file
View file

@ -0,0 +1,28 @@
import os
import pytest
from roll.extensions import traceback
# Before loading any eurordis module.
os.environ["KABA_DB"] = "mongodb://localhost/kaba_test"
from kaba import app as kaba_app
from kaba import connect, Producer
def pytest_configure(config):
client = connect()
client.drop_database("test_kaba")
def pytest_runtest_setup(item):
# assert get_db().name == "test_eurordis"
for cls in [Producer]:
collection = cls.collection
collection.drop()
@pytest.fixture
def app(): # Requested by Roll testing utilities.
traceback(kaba_app)
return kaba_app

17
tests/test_models.py Normal file
View file

@ -0,0 +1,17 @@
from kaba import Producer, Order, Product
def test_can_create_producer():
producer = Producer(name="Andines")
producer.insert_one()
assert producer.name == "Andines"
retrieved = Producer.find_one(name="Andines")
assert retrieved.name == producer.name
assert retrieved._id == producer._id
def test_can_create_order():
order = Order(products=[Product(name="riz", price="2.4")])
order.insert_one()
retrieved = Order.find_one(_id=order._id)
assert retrieved.products[0].name == "riz"