Creating the User Schema and Endpoints

14 May 2020

Before we get to today’s schemas and endpoints, we need to talk about validation. I spent some time researching API validation and ultimately took most of my inspiration from this article by Miguel Grinberg. My passwords are hashed with a custom_app_context object, which is done in a class function.

# function to hash password
def hash_password(self, password):
    self.hashed_password = pwd_context.encrypt(password)

# function to verify password
def verify_password(self, password):
    print(self.hashed_password)
    return pwd_context.verify(password, self.hashed_password)

Additionally, I created methods for creating and authenticating a token, as recommended by the article. This way the user’s credentials are not sent back and forth with every API request.

# generate a secure token for authentication
def generate_auth_token(self, expiration=600):
    s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
    return s.dumps({'id': self.id}).decode("utf-8")

# verify a token
@staticmethod
def verify_auth_token(token):
    s = Serializer(current_app.config['SECRET_KEY'])
    try:
        data = s.loads(token)
    except SignatureExpired:
        return None  # valid token, expired
    except BadSignature:
        return None  # invalid token

    user = User.query.get(data['id'])
    return user

I imported the flask_httpauth package and initialized an HTTPBasicAuth object in my __init__.py folder. Then I created the verify_password function.

# verifying password and email
@auth.verify_password
def verify_password(username_or_token, password, needs_valid_email=True):
    print(username_or_token, password)
    # first try to authenticate by token
    user = User.verify_auth_token(username_or_token)
    if not user:
        # try to authenticate with username, password
        user = User.query.filter_by(email=username_or_token).first()
        if not user or not user.verify_password(password):
            return False
        if needs_valid_email and not user.email_validated:
            raise InvalidUsage("Email not validated.", 401)
    g.user = user
    return True

This function first tries to authenticate by token, and then by username and password if token authentication fails. It’s a pretty straight rewrite of Miguel Grinberg’s example, with one difference: I added a needs_valid_email check to determine if the user had validated their email. This was enabled on default, but I needed an option to disable it for specific instances, which I’ll detail later in this post. The user.email_validated variable is a boolean that I added to the User model.

With the authentication written, it was time to define my schema.

# schema that returns/validates a user
class UserSchema(ma.SQLAlchemyAutoSchema):
    class Meta:
        model = User
        include_fk = True

    # return a user
    @post_load
    def return_user(self, data, ** kwargs):
        existing_user = User.query.filter_by(email=data["email"]).first()
        if existing_user:
            return existing_user
        else:
            return User(** data)

Again, this schema is extremely simple and relies on the SQLAlchemyAutoSchema class to generate. The only piece I added on was a post_load call that checked if the user was in the system, and returned that specific user if so. If not, it reaturned a new user. I queried the users using the email attribute, as it is guaranteed to be present (unlike the id and also unique).

Additionally, I added a second schema, specifically for the creation of new Users. I chose to do it this way because I wanted to store the logic of hashing the password in the schema, rather than doing so in the actual route. This also allowed me to validate a “password” field to ensure that the user’s password was up to security standards before hashing. I used a post_load decorator to hash the password and a validates decorator to validate the password.

# schema for creating a new user
class CreateUserSchema(ma.Schema):
    email = fields.Str(required=True)
    password = fields.Str(required=True)
    access_level = fields.Int(default=0)

    @post_load
    def return_new_user(self, data, ** kwargs):
        data["hashed_password"] = data.pop("password")  # prevent typeerror when creating User
        new_user = User(** data)
        new_user.hash_password(new_user.hashed_password)
        return new_user

    @validates('password')
    def validate_password(self, password):
        if len(password) < 3:
            raise ValidationError("Password is too short!")
        if len(password) > 20:
            raise ValidationError("Password is too long!")
        if password.isalpha():
            raise ValidationError("Password must contain at least one number and one symbol!")
        if password.isdigit():
            raise ValidationError("Password must contain at least one letter!")
        required_characters = ['!', '@', "#", "$", "%", "^", "&", "* ", "+", "=", "?"]
        if not any(char in password for char in required_characters):
            raise ValidationError(f"Password must contain one of the following characters: {required_characters}")

        return True

Also note that access_level defaults to 0 right now. I’m still working out the best way to structure user levels of access, but tentatively I’m thinking that 2 is an admin (and would only use my specific account), whereas 0 and 1 are either guest accounts or unverified emails. Frankly, I’m not entirely wedded to the three-tiered system, as you’ll see later in this post.

The Endpoints

The endpoints follow REST standards: there is a “GET” for both collections and single users, a “POST” for new users, a “DELETE” for single users, and a “PUT” for changing user information, such as password and email.

The two “GET” routes and the “DELETE” route are the simplest, and are extremely similar to my other resource routes of the same type:

# return all users, or a filtered list
@user.route("/users", methods=["GET"])
@auth.login_required
def get_users():
    all_users = User.query.all()
    return jsonify(users_schema.dump(all_users))


# get a specific user
@user.route("/users/<int:id_>", methods=["GET"])
def get_user(id_):
    cur_user = User.query.get(id_)
    if not cur_user:
        raise NotFoundException("user", id_)
    return jsonify(user_schema.dump(cur_user))


# delete a user
@user.route("/users/<int:id_>", methods=["DELETE"])
def delete_user(id_):
    cur_user = User.query.get(id_)
    if not cur_user:
        raise NotFoundException("user", id_)
    db.session.delete(cur_user)
    db.session.commit()

    return ("", 204)

My “POST” request for new users makes use of the CreateUserSchema, and I catch any ValidationErrors to send back to the client as a payload.

# post a new user
@user.route("/users", methods=["POST"])
def post_user():
    new_user_json = request.json.get("user")
    if not new_user_json:
        raise InvalidUsage("You must provide a 'user' key to post a new user.")
    try:
        new_user = create_user_schema.load(new_user_json)
        db.session.add(new_user)
        db.session.commit()
        return user_schema.dump(new_user), 201
    except ValidationError as error:
        raise InvalidUsage("Your user data was not formatted correctly.", payload=error.messages)

My “PUT” request went through several iterations before I realized that I wasn’t complying with proper RESTful standards by allowing the user to change only part of the data. I rewrote it so that all of the information about the user must be provided for every “PUT” request. In order to make sure that the new information was valid, I reused my CreateUserSchema, which validated the password for me and checked that all information was there. It then returns a new user, and I transfer the data from that user to my old user (to preserve the ID).

# change user information
@user.route("/users/<int:id_>", methods=["PUT"])
@auth.login_required
def change_user_info(id_):
    updated_user = User.query.get(id_)
    if not updated_user:
        raise NotFoundException("user", id_)
    if updated_user.id != g.user.id:
        raise InvalidUsage("You don't have permission to edit accounts that aren't yours.", 401)
    updated_information = request.json.get("user")
    if not updated_information:
        raise InvalidUsage("You must provide information with a 'user' label.")
    try:
        updated_information_user = create_user_schema.load(updated_information)
    except ValidationError as error:
        raise InvalidUsage("Your user data was not formatted correctly.", payload=error.messages)

    updated_user.email = updated_information_user.email
    updated_user.hashed_password = updated_information_user.hashed_password
    updated_user.access_level = updated_information_user.access_level

    try:
        db.session.commit()
    except IntegrityError:
        raise InvalidUsage("That email is already in use.")
    return jsonify(user_schema.dump(updated_user))

In addition to the basic endpoints, I also implemented three additional ones for verifying email and resources. The first simply generates a token using the method that I created in my Users model.

# get a token for a user
@user.route("/users/token")
@auth.login_required
def get_auth_token():
    token = g.user.generate_auth_token()
    return jsonify({'token': token.decode('ascii')})

The next one sends an email to the user’s provided email account in order to validate it. This one was a bit tricker for several reasons. One, my auth.login_required function checked if a user’s email account was verified and denied permission if it was not. Therefore, I couldn’t use it to validate the user’s credentials. I solved this by manually verifying in the function, using my verify_password funciton with the needs_valid_email setting set to false.

The second issue was the route. In my previous example, the email verification sent a link to the email that, when clicked on, redirected to the app and valided the email of the user in the provided token. However, I couldn’t do that this time, because I did not want any user interaction with the backend.

My solution was partial, and won’t be complete until I can work on the frontend. Essentially, the /users/verification route requires a url argument, in addition to login credentials. This will be the route to the frontend app. The backend sends the email to the user, which directs the user to the frontend page. The frontend then informs the backend that the user is successfully verified.

# email a user with the validation route, provided by the client
def send_validate_email(user, route):
    token = user.generate_auth_token(expiration=2000)
    print("token:", token)
    msg = Message('Verify Your Email', sender='groceryapp@gmail.com', recipients=[user.email])
    msg.body = f'''To verify your email, please visit this link: {route}/{token}.'''
    mail.send(msg)

    return token


# send a verification email
@user.route("/users/verification", methods=["GET"])
def send_verify_email():
    # not using decorator because email is not yet validated
    username = request.authorization["username"]
    password = request.authorization["password"]
    verify_password(username, password, needs_valid_email=False)

    url_to_send = request.args.get("url")
    if not url_to_send:
        raise InvalidUsage("You must provide a client-side url for the verification route")
    token = send_validate_email(g.user, url_to_send)
    return jsonify({"token": token})


# receive verification confirmation
@user.route("/users/verification", methods=["PUT"])
def verify_email():
    print("verifying email")
    token = request.json.get("token")
    user = User.verify_auth_token(token)
    if not user:
        raise InvalidUsage("Unable to get user from token.")
    user.email_validated = True
    db.session.commit()
    return jsonify(user_schema.dump(user))

It’s a bit confusing, but hopefully when I get the frontend set up, I’ll be able to revisit this and explain the process in a bit more depth.

And those are my endpoints. Before I was finished with the user implementation, however, there were a few things I needed to change in the other resources. The addition of Users meant that I needed permissions on who got to edit GroceryLists and Recipes.

New Permissions

The first thing I did was make a slight modification to how my Recipes and GroceryLists were stored. I implemented a creator_id field for both of them, which would store the id of the User who created them. Recipes would only be allowed to be modified by their creator, although anyone can use one in a GroceryList. GroceryLists will have the ability to have multiple collaborators, but I still wanted to establish the creator as a separate field. Thus, the association table became one of “editors” rather than “users.”

This required some updating of my endpoints. For my Recipes, the changes were minor: I simply added a login_required decorator in front of the “DELETE” and “POST” methods. For the “POST” method, the user’s id was added to the json data before it was passed into the RecipeSchema. For the “DELETE” method, the user’s login information is compared with the creator’s ID (to ensure that only the creator can delete a recipe). If they don’t match, then a 401 response is given.

@recipe.route("/recipes", methods=["POST"])
@auth.login_required
def post_recipe():
    new_recipe_data = request.json.get("recipe")
    if not new_recipe_data:
        raise InvalidUsage("You must provide a recipe to POST.")
    new_recipe_data["creator_id"] = g.user.id

    new_recipe = recipe_schema.load(request.json.get("recipe"))
    db.session.add(new_recipe)
    db.session.commit()
    return jsonify(recipe_schema.dump(new_recipe)), 201


@recipe.route("/recipes/<int:id_>", methods=["DELETE"])
@auth.login_required
def delete_recipe(id_):
    recipe_to_delete = Recipe.query.get(id_)
    if not recipe_to_delete:
        raise NotFoundException("recipe", id_, "The recipe you are trying to delete does not exist.")

    if recipe_to_delete.creator is not g.user:
        raise InvalidUsage("You are not the creator of this recipe.", 401)

    db.session.delete(recipe_to_delete)
    db.session.commit()

    return ('', 204)

For my GroceryList endpoints, things were a bit more complicated. The easy change was to the “POST” and “DELETE” methods, and were essentially the same as the new “POST” method for the recipe. For my “PUT” method, however, I needed to check both the creator and the list of editors. I also rewrote the method so that it replaced the entire contents of the list, rather than modifying it as it had before.

@grocerylist.route("/lists/<int:id_>", methods=["PUT"])
@auth.login_required
def add_to_list(id_):
    list_to_modify = GroceryList.query.get(id_)
    if not list_to_modify:
        raise NotFoundException("grocerylist", id_)
    if not (list_to_modify.creator == g.user or g.user in list_to_modify.editors):
        print(g.user, list_to_modify.editors)
        raise InvalidUsage("You don't have permission to modify this list.", 401)

    list_to_modify.clear_grocerylist()

    # add provided recipes
    recipes = request.json.get("recipes")
    if recipes:
        for recipe in recipes:
            if recipe.get("id"):
                list_to_modify.recipes.append(Recipe.query.get(recipe.get("id")))
            else:
                raise InvalidUsage("You must add a recipe by its id.")

    # add provided ingredients
    ingredients = request.json.get("ingredients")
    if ingredients:
        for ingredient in ingredients:
            if ingredient.get("name"):
                add_additional_ingredients(id_, ingredient)
                pass
            else:
                raise InvalidUsage("You must add an ingredient by its name.")

    return jsonify(grocerylist_schema.dump(list_to_modify))

Finally, I also needed a new route that would allow the creator of the list to modify who could edit it. This route takes a list of users and verifies them before replacing the old list of editors.

# set editors to a GroceryList
@grocerylist.route("/lists/<int:id_>/editors", methods=["PUT"])
@auth.login_required
def add_editors(id_):
    current_list = GroceryList.query.get(id_)
    if not current_list:
        raise NotFoundException("grocerylist", id_)
    if current_list.creator is not g.user:
        raise InvalidUsage("You are not the creator of this Grocery List", 401)
    new_editors = users_schema.load(request.json.get("users"))
    current_list.editors = new_editors
    db.session.commit()

    return jsonify(users_schema.dump(new_editors)), 201

Conclusions

And that’s it! There are still a few areas that I need to cover, such as adding in filters on my GroceryList, as well as integrating spaCy and my web scrapers. I would also like to do some refactoring, since code is repeated in several places and that’s a no-go. But the bulk of my functionality is in place now for the backend, and I’m really excited to begin work on the front. I’ve been teaching myself React in the mornings since I started this new version, and I’m really excited to bring a ton of new functionality to this thing.