mirror of
https://github.com/almet/copanier.git
synced 2025-04-28 19:42:37 +02:00
Add xlsx import
This commit is contained in:
parent
f7b90a392f
commit
7ce15a79e9
4 changed files with 142 additions and 9 deletions
|
@ -8,7 +8,7 @@ from jinja2 import Environment, PackageLoader, select_autoescape
|
||||||
from roll import Roll, Response, HttpError
|
from roll import Roll, Response, HttpError
|
||||||
from roll.extensions import cors, options, traceback, simple_server, static
|
from roll.extensions import cors, options, traceback, simple_server, static
|
||||||
|
|
||||||
from . import config, reports, session, utils, emails, loggers
|
from . import config, reports, session, utils, emails, loggers, imports
|
||||||
from .models import Delivery, Order, Person, Product, ProductOrder
|
from .models import Delivery, Order, Person, Product, ProductOrder
|
||||||
|
|
||||||
|
|
||||||
|
@ -177,14 +177,28 @@ async def create_delivery(request, response):
|
||||||
async def import_products(request, response, id):
|
async def import_products(request, response, id):
|
||||||
delivery = Delivery.load(id)
|
delivery = Delivery.load(id)
|
||||||
delivery.products = []
|
delivery.products = []
|
||||||
reader = csv.DictReader(
|
data = request.files.get("data")
|
||||||
request.files.get("data").read().decode().splitlines(), delimiter=";"
|
path = f"/livraison/{delivery.id}"
|
||||||
)
|
if data.filename.endswith(".csv"):
|
||||||
for row in reader:
|
try:
|
||||||
delivery.products.append(Product(**row))
|
imports.products_from_csv(delivery, data.read().decode())
|
||||||
delivery.persist()
|
except ValueError as err:
|
||||||
|
response.message(err, status="error")
|
||||||
|
response.redirect = path
|
||||||
|
return
|
||||||
|
elif data.filename.endswith('.xlsx'):
|
||||||
|
try:
|
||||||
|
imports.products_from_xlsx(delivery, data)
|
||||||
|
except ValueError as err:
|
||||||
|
response.message(err, status="error")
|
||||||
|
response.redirect = path
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
response.message("Format de fichier inconnu", status="error")
|
||||||
|
response.redirect = path
|
||||||
|
return
|
||||||
response.message("Les produits de la livraison ont bien été mis à jour!")
|
response.message("Les produits de la livraison ont bien été mis à jour!")
|
||||||
response.redirect = f"/livraison/{delivery.id}"
|
response.redirect = path
|
||||||
|
|
||||||
|
|
||||||
@app.route("/livraison/{id}/exporter/produits", methods=["GET"])
|
@app.route("/livraison/{id}/exporter/produits", methods=["GET"])
|
||||||
|
|
38
copanier/imports.py
Normal file
38
copanier/imports.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
import csv
|
||||||
|
from zipfile import BadZipFile
|
||||||
|
|
||||||
|
from openpyxl import load_workbook, Workbook
|
||||||
|
|
||||||
|
from .models import Product
|
||||||
|
|
||||||
|
|
||||||
|
PRODUCT_FIELDS = {"ref", "name", "price"}
|
||||||
|
|
||||||
|
|
||||||
|
def products_from_xlsx(delivery, data):
|
||||||
|
if not isinstance(data, Workbook):
|
||||||
|
try:
|
||||||
|
data = load_workbook(data)
|
||||||
|
except BadZipFile:
|
||||||
|
raise ValueError("Impossible de lire le fichier")
|
||||||
|
rows = list(data.active.values)
|
||||||
|
if not rows:
|
||||||
|
raise ValueError
|
||||||
|
headers = rows[0]
|
||||||
|
if not set(headers) >= PRODUCT_FIELDS:
|
||||||
|
raise ValueError("Colonnes obligatoires: name, ref, price")
|
||||||
|
delivery.products = []
|
||||||
|
for row in rows[1:]:
|
||||||
|
raw = {k: v for k, v in dict(zip(headers, row)).items() if v}
|
||||||
|
delivery.products.append(Product(**raw))
|
||||||
|
delivery.persist()
|
||||||
|
|
||||||
|
|
||||||
|
def products_from_csv(delivery, data):
|
||||||
|
reader = csv.DictReader(data.splitlines(), delimiter=";")
|
||||||
|
if not set(reader.fieldnames) >= PRODUCT_FIELDS:
|
||||||
|
raise ValueError("Colonnes obligatoires: name, ref, price")
|
||||||
|
delivery.products = []
|
||||||
|
for row in reader:
|
||||||
|
delivery.products.append(Product(**row))
|
||||||
|
delivery.persist()
|
|
@ -40,7 +40,7 @@
|
||||||
<h3>Importer des produits (CSV)</h3>
|
<h3>Importer des produits (CSV)</h3>
|
||||||
<p>Colonnes: ref*, name*, price*, description</p>
|
<p>Colonnes: ref*, name*, price*, description</p>
|
||||||
<form action="/livraison/{{ delivery.id }}/importer/produits" method="post" enctype="multipart/form-data">
|
<form action="/livraison/{{ delivery.id }}/importer/produits" method="post" enctype="multipart/form-data">
|
||||||
<input type="file" name="data">
|
<input type="file" name="data" required>
|
||||||
<input type="submit" name="Importer des produits">
|
<input type="submit" name="Importer des produits">
|
||||||
</form>
|
</form>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
81
tests/test_imports.py
Normal file
81
tests/test_imports.py
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from openpyxl import Workbook
|
||||||
|
|
||||||
|
from copanier import imports
|
||||||
|
from copanier.models import Product, Delivery
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def workbook():
|
||||||
|
def _(rows, headers=["ref", "name", "price"]):
|
||||||
|
wb = Workbook()
|
||||||
|
ws = wb.active
|
||||||
|
ws.append(headers)
|
||||||
|
for row in rows:
|
||||||
|
ws.append(row)
|
||||||
|
return wb
|
||||||
|
|
||||||
|
return _
|
||||||
|
|
||||||
|
|
||||||
|
def test_mandatory_headers_with_xlsx(delivery, workbook):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_xlsx(
|
||||||
|
delivery,
|
||||||
|
workbook([("123", "Chocolat", "2.3")], headers=["ref", "nom", "prix"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_bad_xlsx_file(delivery, workbook):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_xlsx(delivery, BytesIO(b"pouet"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_xlsx_import(delivery, workbook):
|
||||||
|
delivery.persist()
|
||||||
|
assert delivery.products == [Product(ref="123", name="Lait", price=1.5)]
|
||||||
|
imports.products_from_xlsx(delivery, workbook([("123", "Lait cru", 1.3)]))
|
||||||
|
assert Delivery.load(delivery.id).products == [
|
||||||
|
Product(ref="123", name="Lait cru", price=1.3)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_xlsx_import_invalid_price(delivery, workbook):
|
||||||
|
delivery.persist()
|
||||||
|
assert delivery.products == [Product(ref="123", name="Lait", price=1.5)]
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_xlsx(delivery, workbook([("123", "Lait cru", "invalid")]))
|
||||||
|
assert Delivery.load(delivery.id).products == [
|
||||||
|
Product(ref="123", name="Lait", price=1.5)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_mandatory_headers_with_csv(delivery):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_csv(delivery, "ref;nom;prix\n123;Chocolat;2.3")
|
||||||
|
|
||||||
|
|
||||||
|
def test_bad_csv_file(delivery):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_csv(delivery, "pouet")
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_csv_import(delivery):
|
||||||
|
delivery.persist()
|
||||||
|
assert delivery.products == [Product(ref="123", name="Lait", price=1.5)]
|
||||||
|
imports.products_from_csv(delivery, "ref;name;price\n123;Lait cru;1.3")
|
||||||
|
assert Delivery.load(delivery.id).products == [
|
||||||
|
Product(ref="123", name="Lait cru", price=1.3)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_csv_import_invalid_price(delivery):
|
||||||
|
delivery.persist()
|
||||||
|
assert delivery.products == [Product(ref="123", name="Lait", price=1.5)]
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
imports.products_from_csv(delivery, "ref;name;price\n123;Lait cru;invalid")
|
||||||
|
assert Delivery.load(delivery.id).products == [
|
||||||
|
Product(ref="123", name="Lait", price=1.5)
|
||||||
|
]
|
Loading…
Reference in a new issue