Implement rate limiting with Flask-Limiter

Fixes #1054
This commit is contained in:
Glandos 2022-08-28 15:11:23 +02:00 committed by Alexis Metaireau
parent d8b6355101
commit e7ab3c1a95
5 changed files with 48 additions and 70 deletions

View file

@ -1,3 +1,4 @@
from datetime import datetime
import os
import os.path
import warnings
@ -21,6 +22,7 @@ from ihatemoney.utils import (
IhmJSONEncoder,
PrefixedWSGI,
em_surround,
limiter,
locale_from_iso,
localize_list,
minimal_round,
@ -170,6 +172,7 @@ def create_app(
app.register_blueprint(web_interface)
app.register_blueprint(apiv1)
app.register_error_handler(404, page_not_found)
limiter.init_app(app)
# Configure the a, root="main"pplication
setup_database(app)
@ -187,6 +190,7 @@ def create_app(
app.jinja_env.filters["minimal_round"] = minimal_round
app.jinja_env.filters["em_surround"] = lambda text: Markup(em_surround(text))
app.jinja_env.filters["localize_list"] = localize_list
app.jinja_env.filters["from_timestamp"] = datetime.fromtimestamp
# Translations and time zone (used to display dates). The timezone is
# taken from the BABEL_DEFAULT_TIMEZONE settings, and falls back to

View file

@ -134,6 +134,12 @@
<div class="container-fluid flex-shrink-0 {% if request.url_rule.endpoint == 'main.home' %} home-container {% endif %}">
{% block body %}
<main class="content offset-1 col-10">
{% if breached_limit %}
<p class="alert alert-danger">
{{ limit_message }}<br />
{{ _("Please retry after %(date)s.", date=breached_limit.reset_at | from_timestamp | datetimeformat )}}
</p>
{% endif %}
{% block content %}{% endblock %}
</main>
{% endblock %}

View file

@ -1,6 +1,5 @@
import ast
import csv
from datetime import datetime, timedelta
import email.utils
from enum import Enum
from io import BytesIO, StringIO, TextIOWrapper
@ -15,11 +14,18 @@ from babel import Locale
from babel.numbers import get_currency_name, get_currency_symbol
from flask import current_app, flash, redirect, render_template
from flask_babel import get_locale, lazy_gettext as _
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import jinja2
from markupsafe import Markup, escape
from werkzeug.exceptions import HTTPException
from werkzeug.routing import RoutingException
limiter = limiter = Limiter(
current_app,
key_func=get_remote_address,
)
def slugify(value):
"""Normalizes string, converts to lowercase, removes non-alpha characters,
@ -213,45 +219,6 @@ def csv2list_of_dicts(csv_to_convert):
return result
class LoginThrottler:
"""Simple login throttler used to limit authentication attempts based on client's ip address.
When using multiple workers, remaining number of attempts can get inconsistent
but will still be limited to num_workers * max_attempts.
"""
def __init__(self, max_attempts=3, delay=1):
self._max_attempts = max_attempts
# Delay in minutes before resetting the attempts counter
self._delay = delay
self._attempts = {}
def get_remaining_attempts(self, ip):
return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1]
def increment_attempts_counter(self, ip):
# Reset all attempt counters when they get hungry for memory
if len(self._attempts) > 10000:
self.__init__()
if self._attempts.get(ip) is None:
# Store first attempt date and number of attempts since
self._attempts[ip] = [datetime.now(), 0]
self._attempts.get(ip)[1] += 1
def is_login_allowed(self, ip):
if self._attempts.get(ip) is None:
return True
# When the delay is expired, reset the counter
if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay):
self.reset(ip)
return True
if self._attempts.get(ip)[1] >= self._max_attempts:
return False
return True
def reset(self, ip):
self._attempts.pop(ip, None)
def create_jinja_env(folder, strict_rendering=False):
"""Creates and return a Jinja2 Environment object, used, to load the
templates.

View file

@ -19,6 +19,7 @@ from flask import (
current_app,
flash,
g,
make_response,
redirect,
render_template,
request,
@ -56,11 +57,11 @@ from ihatemoney.forms import (
from ihatemoney.history import get_history, get_history_queries, purge_history
from ihatemoney.models import Bill, LoggingMode, Person, Project, db
from ihatemoney.utils import (
LoginThrottler,
Redirect303,
csv2list_of_dicts,
flash_email_error,
format_form_errors,
limiter,
list_of_dicts2csv,
list_of_dicts2json,
render_localized_template,
@ -69,8 +70,6 @@ from ihatemoney.utils import (
main = Blueprint("main", __name__)
login_throttler = LoginThrottler(max_attempts=3, delay=1)
def requires_admin(bypass=None):
"""Require admin permissions for @requires_admin decorated endpoints.
@ -161,7 +160,22 @@ def health():
return "OK"
def admin_limit(limit):
return make_response(
render_template(
"admin.html",
breached_limit=limit,
limit_message=_("Too many failed login attempts."),
)
)
@main.route("/admin", methods=["GET", "POST"])
@limiter.limit(
"3/minute",
on_breach=admin_limit,
methods=["POST"],
)
def admin():
"""Admin authentication.
@ -170,33 +184,19 @@ def admin():
form = AdminAuthenticationForm()
goto = request.args.get("goto", url_for(".home"))
is_admin_auth_enabled = bool(current_app.config["ADMIN_PASSWORD"])
if request.method == "POST":
client_ip = request.remote_addr
if not login_throttler.is_login_allowed(client_ip):
msg = _("Too many failed login attempts, please retry later.")
form["admin_password"].errors = [msg]
return render_template(
"admin.html",
form=form,
admin_auth=True,
is_admin_auth_enabled=is_admin_auth_enabled,
)
if form.validate():
# Valid password
if check_password_hash(
current_app.config["ADMIN_PASSWORD"], form.admin_password.data
):
session["is_admin"] = True
session.update()
login_throttler.reset(client_ip)
return redirect(goto)
# Invalid password
login_throttler.increment_attempts_counter(client_ip)
msg = _(
"This admin password is not the right one. Only %(num)d attempts left.",
num=login_throttler.get_remaining_attempts(client_ip),
)
form["admin_password"].errors = [msg]
if request.method == "POST" and form.validate():
# Valid password
if check_password_hash(
current_app.config["ADMIN_PASSWORD"], form.admin_password.data
):
session["is_admin"] = True
session.update()
return redirect(goto)
msg = _(
"This admin password is not the right one. Only %(num)d attempts left.",
num=limiter.current_limit.remaining,
)
form["admin_password"].errors = [msg]
return render_template(
"admin.html",
form=form,

View file

@ -30,6 +30,7 @@ install_requires =
email_validator>=1.0,<2
Flask-Babel>=1.0,<3
Flask-Cors>=3.0.8,<4
Flask-Limiter>=2.6,<3
Flask-Mail>=0.9.1,<1
Flask-Migrate>=2.5.3,<4 # Not following semantic versioning (e.g. https://github.com/miguelgrinberg/flask-migrate/commit/1af28ba273de6c88544623b8dc02dd539340294b)
Flask-RESTful>=0.3.9,<1