import csv import io import pytest from datetime import datetime, timedelta from stor.auth.models import Group, User from stor.repository.definitions import FundingType from stor.repository.models import FundingProject DATE_FORMAT = "%Y-%m-%d %H:%M:%S.%f" @pytest.mark.parametrize( ("username", "message"), ( ("", "Access Denied"), ("vera", "Email not verified"), ("ina", "Account not activated"), ("maya", "Access Denied"), ("una", "Access Denied"), ("gobnait", "Access Denied"), ("steph", "Access Denied"), ("reeve", "Access Denied"), ), ) def test_get_unauthenticated_and_unauthorized(client, auth, username, message): if username: auth.login(username) endpoints = [ "/groups", "/users", "/users/4", "/users/search?q=gobnait", "/users/4/edit", "/users/4/delete", "/users/report", "/projects", "/projects/create", "/projects/1/edit", "/projects/1/delete", ] for endpoint in endpoints: assert bytes(message, "utf8") in client.get("/admin{0}".format(endpoint), follow_redirects=True).data def test_list_groups_get_authorized(client, auth): auth.login("ada") response = client.get("/admin/groups") assert response.status_code == 200 assert b"admin" in response.data assert b"reviewer" in response.data assert b"staff" in response.data assert b"standard" in response.data ALL_NAMES = ["Ada", "Gobnait", "Ina", "Leo", "Maya", "Reeve", "Stan", "Steph", "Úna", "Vera"] @pytest.mark.parametrize( ("request_arg", "expected_names"), ( ("", ["Ada", "Gobnait", "Ina", "Leo"]), ("?page=1", ["Ada", "Gobnait", "Ina", "Leo"]), ("?page=2", ["Maya", "Reeve", "Stan", "Steph"]), ("?page=3", ["Úna", "Vera"]), ("?page=4", []), ), ) def test_list_users_get_authorized(client, auth, request_arg, expected_names): auth.login("ada") response = client.get("/admin/users{0}".format(request_arg)) assert response.status_code == 200 for name in ALL_NAMES: assert (bytes(name, "utf8") in response.data) == (name in expected_names) assert bytes("Total users: 10", "utf8") in response.data @pytest.mark.parametrize( ("request_args", "es_query", "es_total", "es_results", "page_no", "expected_names"), ( ("?q=zzzzz", "zzzzz", 0, [], 1, []), ("?q=gobnait", "gobnait", 1, [4], 1, ["Gobnait"]), ("?q=gobnait&page=2", "gobnait", 1, [], 2, []), ("?q=gobnait&page=0", "gobnait", 1, [4], 1, ["Gobnait"]), ("?q=gobnait&page=-1", "gobnait", 1, [4], 1, ["Gobnait"]), ("?q=reeve@test.com", "reeve@test.com", 1, [2], 1, ["Reeve"]), ("?q=Úna", "Úna", 1, [6], 1, ["Úna"]), ("?q=ford", "ford", 1, [3], 1, ["Stan"]), ("?q=University", "University", 2, [1, 2], 1, ["Ada", "Reeve"]), ("?q=Úna", "Úna", 1, [6], 1, ["Úna"]), ("?q=sunshine", "sunshine", 7, [3, 4, 5, 6, 7], 1, ["Stan", "Gobnait", "Leo", "Úna", "Steph"]), ("?q=sunshine&page=1", "sunshine", 7, [3, 4, 5, 6, 7], 1, ["Stan", "Gobnait", "Leo", "Úna", "Steph"]), ("?q=sunshine&page=2", "sunshine", 7, [8, 9], 2, ["Vera", "Ina"]), ("?q=sunshine&page=3", "sunshine", 7, [], 3, []), ), ) def test_search_users(app, client, auth, request_args, es_query, es_total, es_results, page_no, expected_names): auth.login("ada") app.elasticsearch.search.return_value = { "hits": { "total": {"value": es_total}, "hits": [{"_id" : r} for r in es_results] } } response = client.get("/admin/users/search{0}".format(request_args), follow_redirects=True) assert response.status_code == 200 for name in ALL_NAMES: assert (bytes(name, "utf8") in response.data) == (name in expected_names) assert bytes("{0} user(s) found (of 10 total)".format(es_total), "utf8") in response.data expected_search_query = { "multi_match": { "query": es_query, "fields": ["username", "email", "first_name", "last_name", "organization_name"] } } page_size = app.config["USERS_PER_PAGE"] from_ = (page_no - 1) * page_size app.elasticsearch.search.assert_called_once_with(index="users", query=expected_search_query, from_=from_, size=page_size) @pytest.mark.parametrize( ("user_id", "expected_visible", "expected_email", "expected_text"), ( (2, True, "reeve@test.com", "Reeve Ewing, University of Rain"), (4, True, "gobnait@test.com", "Gobnait Ní Mhurchú, Dept of Sunshine"), (0, False, "@test.com", "Not Found"), ), ) def test_detail_user_get_authorized(client, auth, user_id, expected_visible, expected_email, expected_text): auth.login("ada") response = client.get("/admin/users/{0}".format(user_id)) assert response.status_code == 200 assert (bytes("User Detail", "utf8") in response.data) == expected_visible assert (bytes(expected_email, "utf8") in response.data) == expected_visible assert bytes(expected_text, "utf8") in response.data @pytest.mark.parametrize( ("user_id", "expected_visible", "expected_text"), ( (1, True, "ada@test.com"), (2, True, "reeve@test.com"), (3, True, "stan@test.com"), (4, True, "gobnait@test.com"), (5, True, "leo@test.com"), (6, True, "una@test.com"), (7, True, "steph@test.com"), (8, True, "vera@test.com"), (9, True, "ina@test.com"), (0, False, "Not Found"), ), ) def test_edit_user_get_authorized(client, auth, user_id, expected_visible, expected_text): auth.login("ada") response = client.get("/admin/users/{0}/edit".format(user_id)) assert response.status_code == 200 assert (bytes("Edit User", "utf8") in response.data) == expected_visible assert bytes(expected_text, "utf8") in response.data @pytest.mark.parametrize( ("user_id", "first_name", "last_name", "organization_name", "position_in_organization", "activated", "password_rotated", "new_groups"), ( (2, "A", "A", "O1", "P1", True, True, [4]), (3, "B", "B", "O2", "P2", "", True, [3, 4]), (4, "C", "C", "O3", "P3", True, True, [1, 2, 3, 4]), (5, "D", "D", "O4", "", "", True, []), (9, "E", "E", "O5", "P5", True, True, [4]), (9, "E", "E", "O5", "P5", True, "", [4]), (10, "F", "F", "O6", "P6", True, "", [4]), ), ) def test_edit_user_post_authorized_valid(app, client, auth, user_id, first_name, last_name, organization_name, position_in_organization, activated, password_rotated, new_groups): auth.login("ada") data = { "first_name" : first_name, "last_name" : last_name, "organization_name" : organization_name, "position_in_organization" : position_in_organization, "activated" : activated, "password_rotated" : password_rotated, "groups" : new_groups, } response = client.post("/admin/users/{0}/edit".format(user_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 with app.app_context(): user = User.query.filter_by(id=user_id).first() assert user is not None assert user.first_name == first_name assert user.last_name == last_name assert user.organization_name == organization_name assert user.position_in_organization == position_in_organization assert user.activated == bool(activated) assert user.password_rotated == bool(password_rotated) for i in range(1, 5): group = Group.query.filter_by(id=i).first() if i in new_groups: assert group in user.groups else: assert group not in user.groups expected_document = { "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "organization_name": user.organization_name, } app.elasticsearch.index.assert_any_call(index="users", id=user_id, document=expected_document) @pytest.mark.parametrize( ("user_id", "first_name", "last_name", "organization_name", "activated", "password_rotated", "new_groups", "expected_groups", "message"), ( (0, "A", "A", "O1", "", "", [4], [], b"Not Found"), (2, "", "A", "O1", True, True, [4], [2, 3, 4], b"Please fill in this field."), # Missing first name (2, "A", "", "O1", True, True, [4], [2, 3, 4], b"Please fill in this field."), # Missing last name (2, "A", "A", "", True, True, [4], [2, 3, 4], b"Please fill in this field."), # Missing organization name (2, "A", "A", "O1", True, True, [17], [], b""), (2, "A", "A", "O1", True, True, [17, 4], [4], b""), (1, "A", "A", "O1", True, True, [2, 3, 4], [1, 2, 3, 4], b"A user may not remove admin permissions from themselves."), (1, "A", "A", "O1", "", True, [1, 2, 3, 4], [1, 2, 3, 4], b"A user may not deactivate their own user account."), (10, "A", "A", "O1", True, True, [2, 3], [4], b"The password rotated field may only be unset."), ), ) def test_edit_user_post_authorized_invalid(app, client, auth, user_id, first_name, last_name, organization_name, activated, password_rotated, new_groups, expected_groups, message): auth.login("ada") data = { "first_name" : first_name, "last_name" : last_name, "organization_name" : organization_name, "activated" : activated, "password_rotated" : password_rotated, "groups" : new_groups, } response = client.post("/admin/users/{0}/edit".format(user_id), data=data, follow_redirects=True) assert response.status_code == 200 assert message in response.data with app.app_context(): user = User.query.filter_by(id=user_id).first() if user is not None: for i in range(1, 4): group = Group.query.filter_by(id=i).first() if i in expected_groups: assert group in user.groups else: assert group not in user.groups @pytest.mark.parametrize( ("user_id"), ( (6), (2), ), ) def test_delete_user_post_authorized_valid(app, client, auth, user_id): auth.login("ada") data = { "confirm" : True, } response = client.post("/admin/users/{0}/delete".format(user_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 with app.app_context(): assert User.query.filter_by(id=user_id).first() is None app.elasticsearch.delete.assert_any_call(index="users", id=user_id) @pytest.mark.parametrize( ("user_id", "message"), ( (7, b"This user is the owner of or contact for one or more resources."), (4, b"This user is the owner of or contact for one or more resources."), (1, b"A user may not delete their own user account."), ), ) def test_delete_user_post_authorized_invalid(app, client, auth, user_id, message): auth.login("ada") data = { "confirm" : True, } response = client.post("/admin/users/{0}/delete".format(user_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 assert message in response.data with app.app_context(): assert User.query.filter_by(id=user_id).first() is not None def test_report_users_get_valid(client, auth): auth.login("ada") response = client.get("/admin/users/report") assert response.status_code == 200 reader = csv.DictReader(io.StringIO(response.data.decode("utf-8"))) rows = list(reader) assert len(rows) == 10 assert int(rows[0]["id"]) == 1 assert rows[0]["username"] == "ada" assert rows[0]["email"] == "ada@test.com" assert rows[0]["email_verified"] == "True" assert rows[0]["activated"] == "True" assert rows[0]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[0]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[0]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[0]["first_name"] == "Ada" assert rows[0]["last_name"] == "Min" assert rows[0]["organization_name"] == "University of Rain" assert rows[0]["organization_address"] == "" assert rows[0]["organization_phone"] == "" assert rows[0]["position_in_organization"] == "" assert int(rows[0]["no_of_resources"]) == 0 assert int(rows[1]["id"]) == 2 assert rows[1]["username"] == "reeve" assert rows[1]["email"] == "reeve@test.com" assert rows[1]["email_verified"] == "True" assert rows[1]["activated"] == "True" assert rows[1]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[1]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[1]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[1]["first_name"] == "Reeve" assert rows[1]["last_name"] == "Ewing" assert rows[1]["organization_name"] == "University of Rain" assert rows[1]["organization_address"] == "" assert rows[1]["organization_phone"] == "" assert rows[1]["position_in_organization"] == "" assert int(rows[1]["no_of_resources"]) == 0 assert int(rows[2]["id"]) == 3 assert rows[2]["username"] == "stan" assert rows[2]["email"] == "stan@test.com" assert rows[2]["email_verified"] == "True" assert rows[2]["activated"] == "True" assert rows[2]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[2]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[2]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[2]["first_name"] == "Stan" assert rows[2]["last_name"] == "Ford" assert rows[2]["organization_name"] == "Dept of Sunshine" assert rows[2]["organization_address"] == "" assert rows[2]["organization_phone"] == "" assert rows[2]["position_in_organization"] == "" assert int(rows[2]["no_of_resources"]) == 0 assert int(rows[3]["id"]) == 4 assert rows[3]["username"] == "gobnait" assert rows[3]["email"] == "gobnait@test.com" assert rows[3]["email_verified"] == "True" assert rows[3]["activated"] == "True" assert rows[3]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[3]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[3]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[3]["first_name"] == "Gobnait" assert rows[3]["last_name"] == "Ní Mhurchú" assert rows[3]["organization_name"] == "Dept of Sunshine" assert rows[3]["organization_address"] == "123 Department Street" assert rows[3]["organization_phone"] == "(01)1234567" assert rows[3]["position_in_organization"] == "Officer" assert int(rows[3]["no_of_resources"]) == 5 assert int(rows[4]["id"]) == 5 assert rows[4]["username"] == "leo" assert rows[4]["email"] == "leo@test.com" assert rows[4]["email_verified"] == "True" assert rows[4]["activated"] == "True" assert rows[4]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[4]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[4]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[4]["first_name"] == "Leo" assert rows[4]["last_name"] == "Cat" assert rows[4]["organization_name"] == "Dept of Sunshine" assert rows[4]["organization_address"] == "" assert rows[4]["organization_phone"] == "" assert rows[4]["position_in_organization"] == "" assert int(rows[4]["no_of_resources"]) == 2 assert int(rows[5]["id"]) == 6 assert rows[5]["username"] == "una" assert rows[5]["email"] == "una@test.com" assert rows[5]["email_verified"] == "True" assert rows[5]["activated"] == "True" assert rows[5]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[5]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[5]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[5]["first_name"] == "Úna" assert rows[5]["last_name"] == "Ssigned" assert rows[5]["organization_name"] == "Dept of Sunshine" assert rows[5]["organization_address"] == "" assert rows[5]["organization_phone"] == "" assert rows[5]["position_in_organization"] == "" assert int(rows[5]["no_of_resources"]) == 0 assert int(rows[6]["id"]) == 7 assert rows[6]["username"] == "steph" assert rows[6]["email"] == "steph@test.com" assert rows[6]["email_verified"] == "True" assert rows[6]["activated"] == "True" assert rows[6]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[6]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[6]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[6]["first_name"] == "Steph" assert rows[6]["last_name"] == "Anie" assert rows[6]["organization_name"] == "Dept of Sunshine" assert rows[6]["organization_address"] == "" assert rows[6]["organization_phone"] == "" assert rows[6]["position_in_organization"] == "" assert int(rows[6]["no_of_resources"]) == 3 assert int(rows[7]["id"]) == 8 assert rows[7]["username"] == "vera" assert rows[7]["email"] == "vera@test.com" assert rows[7]["email_verified"] == "False" assert rows[7]["activated"] == "False" assert rows[7]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[7]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[7]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[7]["first_name"] == "Vera" assert rows[7]["last_name"] == "Fication-Pending" assert rows[7]["organization_name"] == "Dept of Sunshine" assert rows[7]["organization_address"] == "" assert rows[7]["organization_phone"] == "" assert rows[7]["position_in_organization"] == "" assert int(rows[7]["no_of_resources"]) == 0 assert int(rows[8]["id"]) == 9 assert rows[8]["username"] == "ina" assert rows[8]["email"] == "ina@test.com" assert rows[8]["email_verified"] == "True" assert rows[8]["activated"] == "False" assert rows[8]["password_rotated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[8]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[8]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[8]["first_name"] == "Ina" assert rows[8]["last_name"] == "Ctive" assert rows[8]["organization_name"] == "Dept of Sunshine" assert rows[8]["organization_address"] == "" assert rows[8]["organization_phone"] == "" assert rows[8]["position_in_organization"] == "" assert int(rows[8]["no_of_resources"]) == 0 assert int(rows[9]["id"]) == 10 assert rows[9]["username"] == "maya" assert rows[9]["email"] == "maya@test.com" assert rows[9]["email_verified"] == "True" assert rows[9]["password_rotated"] == "False" assert rows[9]["activated"] == "True" assert((datetime.utcnow() - datetime.strptime(rows[9]["joined"], DATE_FORMAT)) < timedelta(seconds=2)) assert((datetime.utcnow() - datetime.strptime(rows[9]["last_seen"], DATE_FORMAT)) < timedelta(seconds=2)) assert rows[9]["first_name"] == "Maya" assert rows[9]["last_name"] == "Grated-User" assert rows[9]["organization_name"] == "Dept of Sunshine" assert rows[9]["organization_address"] == "" assert rows[9]["organization_phone"] == "" assert rows[9]["position_in_organization"] == "" assert int(rows[9]["no_of_resources"]) == 0 def test_list_projects_authorized(client, auth): auth.login("ada") response = client.get("/admin/projects", follow_redirects=True) assert b"Funding Project A" in response.data assert b"Funding Project B" in response.data assert b"Funding Project C" in response.data def test_create_project_authorized_valid(app, client, auth): auth.login("ada") project_id = 5 data = { "name" : "Project Name", "short_name" : "Project Short Name", "url" : "http://www.test.com", "funding_type" : FundingType.NATIONAL_FUNDS.name, "funder" : "Dept of Sunshine", "funding_country" : "IE", } response = client.post("/admin/projects/create", data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 with app.app_context(): project = FundingProject.query.filter_by(id=project_id).first() assert project.name == "Project Name" assert project.short_name == "Project Short Name" assert project.url == "http://www.test.com" assert project.funding_type == FundingType.NATIONAL_FUNDS assert project.funder == "Dept of Sunshine" assert project.funding_country == "IE" @pytest.mark.parametrize( ("new_name", "new_funding_type", "new_funding_country", "message"), ( ("", FundingType.NATIONAL_FUNDS.name, "IE", b"Please fill in this field."), ("Funding Project A", FundingType.NATIONAL_FUNDS.name, "IE", b"A project with this name already exists."), ("New Name", "UNKNOWN", "IE", b"Not a valid choice."), ("New Name", "", "IE", b"Not a valid choice."), ("New Name", FundingType.NATIONAL_FUNDS.name, "UNKNOWN", b"Not a valid choice."), ), ) def test_create_project_authorized_invalid(app, client, auth, new_name, new_funding_type, new_funding_country, message): auth.login("ada") project_id = 5 data = { "name" : new_name, "short_name" : "Project Short Name", "url" : "http://www.test.com", "funding_type" : new_funding_type, "funder" : "New Funder", "funding_country" : new_funding_country, } response = client.post("/admin/projects/create", data=data, follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 0 assert message in response.data with app.app_context(): assert FundingProject.query.filter_by(id=project_id).first() == None @pytest.mark.parametrize( ("new_name", "new_short_name", "new_funding_type", "new_funding_country"), ( ("Funding Project A", "FPA", FundingType.NATIONAL_FUNDS, "IE"), ("updated name", "updated short name", FundingType.OWN_FUNDS, "SK"), ("updated name", "", FundingType.OTHER, "EU"), ), ) def test_edit_project_authorized_valid(app, client, auth, new_name, new_short_name, new_funding_type, new_funding_country): auth.login("ada") project_id = 1 data = { "name" : new_name, "short_name" : new_short_name, "url" : "http://www.test.com", "funding_type" : new_funding_type.name, "funder" : "Dept of Sunshine", "funding_country" : new_funding_country, } response = client.post("/admin/projects/{0}/edit".format(project_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 with app.app_context(): project = FundingProject.query.filter_by(id=project_id).first() assert project.name == new_name assert project.short_name == new_short_name assert project.url == "http://www.test.com" assert project.funding_type == new_funding_type assert project.funder == "Dept of Sunshine" assert project.funding_country == new_funding_country @pytest.mark.parametrize( ("new_name", "new_funding_type", "new_funding_country", "message"), ( ("", FundingType.OWN_FUNDS.name, "EU", b"Please fill in this field."), ("Funding Project B", FundingType.OWN_FUNDS.name, "EU", b"A project with this name already exists."), ("New Name", "UNKNOWN", "EU", b"Not a valid choice."), ("New Name", "", "EU", b"Not a valid choice."), ("New Name", FundingType.OWN_FUNDS.name, "UNKNOWN", b"Not a valid choice."), ), ) def test_edit_project_authorized_invalid(app, client, auth, new_name, new_funding_type, new_funding_country, message): auth.login("ada") data = { "name" : new_name, "short_name" : "fp2", "url" : "http://www.test.com", "funding_type" : new_funding_type, "funder" : "New Funder", "funding_country" : new_funding_country, } response = client.post("/admin/projects/1/edit", data=data, follow_redirects=True) assert response.status_code == 200 assert len(response.history) == 0 assert message in response.data with app.app_context(): project = FundingProject.query.filter_by(id=1).first() assert project.name == "Funding Project A" assert project.short_name == "FPA" assert project.url == None assert project.funding_type == FundingType.NATIONAL_FUNDS assert project.funder == None def test_delete_project_authorized_valid(app, client, auth): auth.login("ada") project_id = 4 data = { "confirm" : True, } response = client.post("/admin/projects/{0}/delete".format(project_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 with app.app_context(): assert FundingProject.query.filter_by(id=project_id).first() is None def test_delete_project_authorized_invalid(app, client, auth): auth.login("ada") project_id = 1 data = { "confirm" : True, } response = client.post("/admin/projects/{0}/delete".format(project_id), data=data, follow_redirects=True) assert response.status_code == 200 assert response.history[0].status_code == 302 assert b"This project cannot be deleted because there are resources associated with it." in response.data with app.app_context(): assert FundingProject.query.filter_by(id=project_id).first() is not None