mirror of
https://github.com/spiral-project/ihatemoney.git
synced 2025-04-28 17:32:38 +02:00
Protect admin endpoints against brute force attacks (#249)
* Protect admin endpoints against brute force attacks Add a throttling mechanism to prevent a client brute forcing the authentication form, based on its ip address Closes #245 * Reset attempt counters if they get memory hungry
This commit is contained in:
parent
68e4114735
commit
ec4a099f18
6 changed files with 93 additions and 9 deletions
|
@ -7,6 +7,7 @@ from flask_babel import Babel
|
|||
from flask_mail import Mail
|
||||
from flask_migrate import Migrate, upgrade, stamp
|
||||
from raven.contrib.flask import Sentry
|
||||
from werkzeug.contrib.fixers import ProxyFix
|
||||
|
||||
from ihatemoney.api import api
|
||||
from ihatemoney.models import db
|
||||
|
@ -104,6 +105,11 @@ def create_app(configuration=None, instance_path='/etc/ihatemoney',
|
|||
load_configuration(app, configuration)
|
||||
app.wsgi_app = PrefixedWSGI(app)
|
||||
|
||||
# Get client's real IP
|
||||
# Note(0livd): When running in a non-proxy setup, is vulnerable to requests
|
||||
# with a forged X-FORWARDED-FOR header
|
||||
app.wsgi_app = ProxyFix(app.wsgi_app)
|
||||
|
||||
validate_configuration(app)
|
||||
app.register_blueprint(web_interface)
|
||||
app.register_blueprint(api)
|
||||
|
|
|
@ -9,6 +9,7 @@ import os
|
|||
import json
|
||||
from collections import defaultdict
|
||||
import six
|
||||
from time import sleep
|
||||
|
||||
from werkzeug.security import generate_password_hash
|
||||
from flask import session
|
||||
|
@ -397,6 +398,28 @@ class BudgetTestCase(IhatemoneyTestCase):
|
|||
resp = self.client.post("/admin?goto=%2Fcreate", data={'admin_password': ''})
|
||||
self.assertNotIn('<a href="/create">/create</a>', resp.data.decode('utf-8'))
|
||||
|
||||
def test_login_throttler(self):
|
||||
self.app.config['ADMIN_PASSWORD'] = generate_password_hash("pass")
|
||||
|
||||
# Authenticate 3 times with a wrong passsword
|
||||
self.client.post("/admin?goto=%2Fcreate", data={'admin_password': 'wrong'})
|
||||
self.client.post("/admin?goto=%2Fcreate", data={'admin_password': 'wrong'})
|
||||
resp = self.client.post("/admin?goto=%2Fcreate", data={'admin_password': 'wrong'})
|
||||
|
||||
self.assertIn('Too many failed login attempts, please retry later.',
|
||||
resp.data.decode('utf-8'))
|
||||
# Change throttling delay
|
||||
import gc
|
||||
for obj in gc.get_objects():
|
||||
if isinstance(obj, utils.LoginThrottler):
|
||||
obj._delay = 0.005
|
||||
break
|
||||
# Wait for delay to expire and retry logging in
|
||||
sleep(1)
|
||||
resp = self.client.post("/admin?goto=%2Fcreate", data={'admin_password': 'wrong'})
|
||||
self.assertNotIn('Too many failed login attempts, please retry later.',
|
||||
resp.data.decode('utf-8'))
|
||||
|
||||
def test_manage_bills(self):
|
||||
self.post_project("raclette")
|
||||
|
||||
|
|
Binary file not shown.
|
@ -162,14 +162,18 @@ msgstr "remboursements"
|
|||
msgid "Export file format"
|
||||
msgstr "Format du fichier d'export"
|
||||
|
||||
#: web.py:95
|
||||
msgid "This admin password is not the right one"
|
||||
msgstr "Le mot de passe administrateur que vous avez entré n'est pas correct"
|
||||
|
||||
#: web.py:95
|
||||
msgid "This private code is not the right one"
|
||||
msgstr "Le code que vous avez entré n'est pas correct"
|
||||
|
||||
#: web.py:106
|
||||
msgid "This admin password is not the right one. Only %(num)d attempts left."
|
||||
msgstr "Le mot de passe administrateur que vous avez entré n'est pas correct. Plus que %(num)d tentatives."
|
||||
|
||||
#: web.py:106
|
||||
msgid "Too many failed login attempts, please retry later."
|
||||
msgstr "Trop d'échecs d'authentification successifs, veuillez réessayer plus tard."
|
||||
|
||||
#: web.py:147
|
||||
#, python-format
|
||||
msgid "You have just created '%(project)s' to share your expenses"
|
||||
|
|
|
@ -7,6 +7,7 @@ from json import dumps
|
|||
from flask import redirect
|
||||
from werkzeug.routing import HTTPException, RoutingException
|
||||
import six
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import csv
|
||||
|
||||
|
@ -131,3 +132,41 @@ def list_of_dicts2csv(dict_to_convert):
|
|||
|
||||
# base64 encoding that works with both py2 and py3 and yield no warning
|
||||
base64_encode = base64.encodestring if six.PY2 else base64.encodebytes
|
||||
|
||||
|
||||
class LoginThrottler():
|
||||
"""Simple login throttler used to limit authentication attempts based on client's ip address.
|
||||
When using multiple workers, remaining number of attempts can get inconsistent
|
||||
but will still be limited to num_workers * max_attempts.
|
||||
"""
|
||||
def __init__(self, max_attempts=3, delay=1):
|
||||
self._max_attempts = max_attempts
|
||||
# Delay in minutes before resetting the attempts counter
|
||||
self._delay = delay
|
||||
self._attempts = {}
|
||||
|
||||
def get_remaining_attempts(self, ip):
|
||||
return self._max_attempts - self._attempts.get(ip, [datetime.now(), 0])[1]
|
||||
|
||||
def increment_attempts_counter(self, ip):
|
||||
# Reset all attempt counters when they get hungry for memory
|
||||
if len(self._attempts) > 10000:
|
||||
self.__init__()
|
||||
if self._attempts.get(ip) is None:
|
||||
# Store first attempt date and number of attempts since
|
||||
self._attempts[ip] = [datetime.now(), 0]
|
||||
self._attempts.get(ip)[1] += 1
|
||||
|
||||
def is_login_allowed(self, ip):
|
||||
if self._attempts.get(ip) is None:
|
||||
return True
|
||||
# When the delay is expired, reset the counter
|
||||
if datetime.now() - self._attempts.get(ip)[0] > timedelta(minutes=self._delay):
|
||||
self.reset(ip)
|
||||
return True
|
||||
if self._attempts.get(ip)[1] >= self._max_attempts:
|
||||
return False
|
||||
return True
|
||||
|
||||
def reset(self, ip):
|
||||
self._attempts.pop(ip, None)
|
||||
|
|
|
@ -27,10 +27,12 @@ from ihatemoney.forms import (
|
|||
InviteForm, MemberForm, PasswordReminder, ProjectForm, get_billform_for,
|
||||
ExportForm
|
||||
)
|
||||
from ihatemoney.utils import Redirect303, list_of_dicts2json, list_of_dicts2csv
|
||||
from ihatemoney.utils import Redirect303, list_of_dicts2json, list_of_dicts2csv, LoginThrottler
|
||||
|
||||
main = Blueprint("main", __name__)
|
||||
|
||||
login_throttler = LoginThrottler(max_attempts=3, delay=1)
|
||||
|
||||
|
||||
def requires_admin(f):
|
||||
"""Require admin permissions for @requires_admin decorated endpoints.
|
||||
|
@ -89,14 +91,24 @@ def admin():
|
|||
form = AdminAuthenticationForm()
|
||||
goto = request.args.get('goto', url_for('.home'))
|
||||
if request.method == "POST":
|
||||
client_ip = request.remote_addr
|
||||
if not login_throttler.is_login_allowed(client_ip):
|
||||
msg = _("Too many failed login attempts, please retry later.")
|
||||
form.errors['admin_password'] = [msg]
|
||||
return render_template("authenticate.html", form=form, admin_auth=True)
|
||||
if form.validate():
|
||||
if check_password_hash(current_app.config['ADMIN_PASSWORD'], form.admin_password.data):
|
||||
# Valid password
|
||||
if (check_password_hash(current_app.config['ADMIN_PASSWORD'],
|
||||
form.admin_password.data)):
|
||||
session['is_admin'] = True
|
||||
session.update()
|
||||
login_throttler.reset(client_ip)
|
||||
return redirect(goto)
|
||||
else:
|
||||
msg = _("This admin password is not the right one")
|
||||
form.errors['admin_password'] = [msg]
|
||||
# Invalid password
|
||||
login_throttler.increment_attempts_counter(client_ip)
|
||||
msg = _("This admin password is not the right one. Only %(num)d attempts left.",
|
||||
num=login_throttler.get_remaining_attempts(client_ip))
|
||||
form.errors['admin_password'] = [msg]
|
||||
return render_template("authenticate.html", form=form, admin_auth=True)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue