diff --git a/ihatemoney/run.py b/ihatemoney/run.py
index 69b9b32f..88f49463 100644
--- a/ihatemoney/run.py
+++ b/ihatemoney/run.py
@@ -1,3 +1,4 @@
+from datetime import datetime
import os
import os.path
import warnings
@@ -21,6 +22,7 @@ from ihatemoney.utils import (
IhmJSONEncoder,
PrefixedWSGI,
em_surround,
+ limiter,
locale_from_iso,
localize_list,
minimal_round,
@@ -170,6 +172,7 @@ def create_app(
app.register_blueprint(web_interface)
app.register_blueprint(apiv1)
app.register_error_handler(404, page_not_found)
+ limiter.init_app(app)
# Configure the a, root="main"pplication
setup_database(app)
@@ -187,6 +190,7 @@ def create_app(
app.jinja_env.filters["minimal_round"] = minimal_round
app.jinja_env.filters["em_surround"] = lambda text: Markup(em_surround(text))
app.jinja_env.filters["localize_list"] = localize_list
+ app.jinja_env.filters["from_timestamp"] = datetime.fromtimestamp
# Translations and time zone (used to display dates). The timezone is
# taken from the BABEL_DEFAULT_TIMEZONE settings, and falls back to
diff --git a/ihatemoney/templates/layout.html b/ihatemoney/templates/layout.html
index 67819adb..9a4ca0f0 100644
--- a/ihatemoney/templates/layout.html
+++ b/ihatemoney/templates/layout.html
@@ -134,6 +134,12 @@
{% block body %}
+ {% if breached_limit %}
+
+ {{ limit_message }}
+ {{ _("Please retry after %(date)s.", date=breached_limit.reset_at | from_timestamp | datetimeformat )}}
+
+ {% endif %}
{% block content %}{% endblock %}
{% endblock %}
diff --git a/ihatemoney/utils.py b/ihatemoney/utils.py
index 513d35ea..ea2e3ac9 100644
--- a/ihatemoney/utils.py
+++ b/ihatemoney/utils.py
@@ -1,6 +1,5 @@
import ast
import csv
-from datetime import datetime, timedelta
import email.utils
from enum import Enum
from io import BytesIO, StringIO, TextIOWrapper
@@ -15,11 +14,18 @@ from babel import Locale
from babel.numbers import get_currency_name, get_currency_symbol
from flask import current_app, flash, redirect, render_template
from flask_babel import get_locale, lazy_gettext as _
+from flask_limiter import Limiter
+from flask_limiter.util import get_remote_address
import jinja2
from markupsafe import Markup, escape
from werkzeug.exceptions import HTTPException
from werkzeug.routing import RoutingException
+limiter = limiter = Limiter(
+ current_app,
+ key_func=get_remote_address,
+)
+
def slugify(value):
"""Normalizes string, converts to lowercase, removes non-alpha characters,
@@ -213,45 +219,6 @@ def csv2list_of_dicts(csv_to_convert):
return result
-class LoginThrottler:
- """Simple login throttler used to limit authentication attempts based on client's ip address.
- When using multiple workers, remaining number of attempts can get inconsistent
- but will still be limited to num_workers * max_attempts.
- """
-
- def __init__(self, max_attempts=3, delay=1):
- self._max_attempts = max_attempts
- # Delay in minutes before resetting the attempts counter
- self._delay = delay
- self._attempts = {}
-
- def get_remaining_attempts(self, ip):
- return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1]
-
- def increment_attempts_counter(self, ip):
- # Reset all attempt counters when they get hungry for memory
- if len(self._attempts) > 10000:
- self.__init__()
- if self._attempts.get(ip) is None:
- # Store first attempt date and number of attempts since
- self._attempts[ip] = [datetime.now(), 0]
- self._attempts.get(ip)[1] += 1
-
- def is_login_allowed(self, ip):
- if self._attempts.get(ip) is None:
- return True
- # When the delay is expired, reset the counter
- if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay):
- self.reset(ip)
- return True
- if self._attempts.get(ip)[1] >= self._max_attempts:
- return False
- return True
-
- def reset(self, ip):
- self._attempts.pop(ip, None)
-
-
def create_jinja_env(folder, strict_rendering=False):
"""Creates and return a Jinja2 Environment object, used, to load the
templates.
diff --git a/ihatemoney/web.py b/ihatemoney/web.py
index c2f19c06..af50aed2 100644
--- a/ihatemoney/web.py
+++ b/ihatemoney/web.py
@@ -19,6 +19,7 @@ from flask import (
current_app,
flash,
g,
+ make_response,
redirect,
render_template,
request,
@@ -56,11 +57,11 @@ from ihatemoney.forms import (
from ihatemoney.history import get_history, get_history_queries, purge_history
from ihatemoney.models import Bill, LoggingMode, Person, Project, db
from ihatemoney.utils import (
- LoginThrottler,
Redirect303,
csv2list_of_dicts,
flash_email_error,
format_form_errors,
+ limiter,
list_of_dicts2csv,
list_of_dicts2json,
render_localized_template,
@@ -69,8 +70,6 @@ from ihatemoney.utils import (
main = Blueprint("main", __name__)
-login_throttler = LoginThrottler(max_attempts=3, delay=1)
-
def requires_admin(bypass=None):
"""Require admin permissions for @requires_admin decorated endpoints.
@@ -161,7 +160,22 @@ def health():
return "OK"
+def admin_limit(limit):
+ return make_response(
+ render_template(
+ "admin.html",
+ breached_limit=limit,
+ limit_message=_("Too many failed login attempts."),
+ )
+ )
+
+
@main.route("/admin", methods=["GET", "POST"])
+@limiter.limit(
+ "3/minute",
+ on_breach=admin_limit,
+ methods=["POST"],
+)
def admin():
"""Admin authentication.
@@ -170,33 +184,19 @@ def admin():
form = AdminAuthenticationForm()
goto = request.args.get("goto", url_for(".home"))
is_admin_auth_enabled = bool(current_app.config["ADMIN_PASSWORD"])
- if request.method == "POST":
- client_ip = request.remote_addr
- if not login_throttler.is_login_allowed(client_ip):
- msg = _("Too many failed login attempts, please retry later.")
- form["admin_password"].errors = [msg]
- return render_template(
- "admin.html",
- form=form,
- admin_auth=True,
- is_admin_auth_enabled=is_admin_auth_enabled,
- )
- if form.validate():
- # Valid password
- if check_password_hash(
- current_app.config["ADMIN_PASSWORD"], form.admin_password.data
- ):
- session["is_admin"] = True
- session.update()
- login_throttler.reset(client_ip)
- return redirect(goto)
- # Invalid password
- login_throttler.increment_attempts_counter(client_ip)
- msg = _(
- "This admin password is not the right one. Only %(num)d attempts left.",
- num=login_throttler.get_remaining_attempts(client_ip),
- )
- form["admin_password"].errors = [msg]
+ if request.method == "POST" and form.validate():
+ # Valid password
+ if check_password_hash(
+ current_app.config["ADMIN_PASSWORD"], form.admin_password.data
+ ):
+ session["is_admin"] = True
+ session.update()
+ return redirect(goto)
+ msg = _(
+ "This admin password is not the right one. Only %(num)d attempts left.",
+ num=limiter.current_limit.remaining,
+ )
+ form["admin_password"].errors = [msg]
return render_template(
"admin.html",
form=form,
diff --git a/setup.cfg b/setup.cfg
index 73ebcb08..01decaf9 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -30,6 +30,7 @@ install_requires =
email_validator>=1.0,<2
Flask-Babel>=1.0,<3
Flask-Cors>=3.0.8,<4
+ Flask-Limiter>=2.6,<3
Flask-Mail>=0.9.1,<1
Flask-Migrate>=2.5.3,<4 # Not following semantic versioning (e.g. https://github.com/miguelgrinberg/flask-migrate/commit/1af28ba273de6c88544623b8dc02dd539340294b)
Flask-RESTful>=0.3.9,<1