Module gpycat.core

Expand source code
import io
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Literal, Union

import requests.exceptions
from loguru import logger
from pydantic import AnyHttpUrl
from requests import Response
from requests.adapters import HTTPAdapter, Retry
from requests_toolbelt import MultipartEncoder

from .models import AlbumNode, GfyItem
from .session import GfySession

BASE_URL = "https://api.gfycat.com/v1"


class Gfycat:
    client_id: str = ""
    client_secret: str = ""
    username: str = ""
    password: str = ""
    credentials: Dict[str, Union[str, int]] = {}
    last_request_status: int = None

    @staticmethod
    def _raise_if_not_ok(response: Response, *args, **kwargs):
        if not response.ok:
            logger.error(response.text)
            raise requests.exceptions.HTTPError(response.text)

    @staticmethod
    def _update_last_status(self, response: Response, *args, **kwargs):
        self.last_request_status = response.status_code

    @staticmethod
    def _refresh_on_expire(response: Response, *args, **kwargs):
        raise NotImplemented

    def __init__(self):
        self.session = GfySession()
        self.session.hooks.update(
            {
                "response": [
                    self._raise_if_not_ok,
                    lambda *args, **kwargs: self._update_last_status(self, *args, **kwargs),
                ]
            }
        )

    @staticmethod
    def transform_content_url_key(gfy_item: dict) -> dict:
        """
        Transform keys that start with numbers so that they start with alphabetical characters instead.

        Args:
            gfy_item (dict): A pre-GfyItem response from the API.

        Returns:
            dict: A pre-GfyItem dict with fixed content URL subkeys.
        """
        return {
            **gfy_item,
            "content_urls": {
                **gfy_item["content_urls"],
                "gif100px": gfy_item["content_urls"]["100pxGif"],
            },
        }

    @staticmethod
    def transform_dashed_key(node: dict):
        for k, v in node.items():
            if "-" in k:
                undashed: List[str] = k.split("-")
                undashed = [u.capitalize() for i, u in enumerate(undashed) if i > 0]
                new_k = "".join(undashed)
                node[new_k] = v
                del node[k]
        return node

    def auth(
        self,
        *,
        client_id: str,
        client_secret: str,
        username: str = "",
        password: str = "",
        grant_type: Literal[
            "client_credentials", "password", "refresh", "convert_code", "provider_token"
        ] = "client_credentials",
    ):
        data = {
            "grant_type": grant_type,
            "client_id": client_id,
            "client_secret": client_secret,
        }
        if grant_type == "client_credentials":
            pass
        elif grant_type == "password":
            if not username:
                raise ValueError("Username is blank")
            if not password:
                raise ValueError("Password is blank")
            data.update({"username": username, "password": password})
        elif grant_type == "refresh":
            data.update({"refresh_token": self.credentials["refresh_token"]})
        else:
            raise NotImplemented
        self.client_id = client_id
        self.client_secret = client_secret

        res = self.session.post(f"{BASE_URL}/oauth/token", data=data)
        data = res.json()
        self.credentials = data
        self.session.headers.update(
            {"Authorization": f"{self.credentials['token_type']} {self.credentials['access_token']}"}
        )

        if grant_type == "refresh":
            logger.info("Refreshed token.")
        else:
            logger.info("Logged in.")

    def get_gfycat(self, gfy_id: str):
        res = self.session.get(f"{BASE_URL}/gfycats/{gfy_id}")
        data = res.json()
        return GfyItem(**self.transform_content_url_key(data["gfyItem"]))

    def new_gfycat_from_url(self, url: AnyHttpUrl):
        raise NotImplemented

    def new_gfycat_from_file(
        self,
        file: Union[str, PathLike],
        *,
        title: str = None,
        description: str = None,
        tags: List[str] = None,
        nsfw: bool = False,
        ignore_md5_check: bool = False,
        keep_audio: True,
        private: False,
    ) -> str:
        session = self.session
        file = Path(file).resolve()
        data = {
            "title": file.name if title is None else title,
            "nsfw": nsfw,
            "keepAudio": keep_audio,
            "noMd5": ignore_md5_check,
            "private": private,
        }
        if description is not None:
            data["description"] = description
        if tags is not None:
            data["tags"] = tags
        retries = Retry(total=100, backoff_factor=0.1)
        session.mount("https://", adapter=HTTPAdapter(max_retries=retries))
        res = session.post(f"{BASE_URL}/gfycats", data=data)
        metadata = res.json()
        with open(file, "rb") as f:
            new_file = io.BytesIO(f.read())
            new_file.name = metadata["gfyname"]
            new_file.seek(0)
            m = MultipartEncoder(
                fields={
                    "key": metadata["gfyname"],
                    "file": (metadata["gfyname"], new_file, "video/mp4"),
                }
            )
            res = requests.post("https://filedrop.gfycat.com", data=m, headers={"Content-Type": m.content_type})
        if res.status_code >= 400:
            raise requests.exceptions.HTTPError(res.text)
        return metadata["gfyname"]

    def check_upload_status(self, gfy_name: str):
        raise NotImplemented

    def update_gfycat(self, gfy_id: str, attribute: str, value: Any):
        self.session.put(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}", data={"value": value})
        logger.info(f'Updated Gfycat {gfy_id} {attribute} to "{value}"')

    def delete_gfycat(self, gfy_id: str):
        self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}")
        logger.info(f"Deleted Gfycat {gfy_id}")

    def delete_gfycat_attribute(self, gfy_id: str, attribute: str):
        self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}")
        logger.info(f"Deleted Gfycat {gfy_id} {attribute}")

    def get_user_feed(self, username: str, cursor: str = None, count: int = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        if count is not None:
            params["count"] = count
        res = self.session.get(f"{BASE_URL}/users/{username}/gfycats", params=params)
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
        }

    def get_my_feed(self, cursor: str = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        res = self.session.get(f"{BASE_URL}/me/gfycats", params=params)
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
        }

    def get_followers_feed(self, cursor: str = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        res = self.session.get(f"{BASE_URL}/me/follows/gfycats")
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
            "count": data["count"],
            "total_count": data["totalCount"],
        }

    def get_my_albums(self):
        res = self.session.get(f"{BASE_URL}/me/album-folders")
        data = res.json()
        return [AlbumNode(**node) for node in data[0]["nodes"]]

    def create_album(self, new_id: str, title: str, description: str = "", contents: List[str] = None):
        if contents is None:
            contents = []
        self.session.post(
            f"{BASE_URL}/me/album-folders/{new_id}",
            data=dict(
                title=title,
                description=description,
                contents=contents,
            ),
        )
        logger.info(f'Created album "{title}"')


gpycat = Gfycat()

Classes

class Gfycat
Expand source code
class Gfycat:
    client_id: str = ""
    client_secret: str = ""
    username: str = ""
    password: str = ""
    credentials: Dict[str, Union[str, int]] = {}
    last_request_status: int = None

    @staticmethod
    def _raise_if_not_ok(response: Response, *args, **kwargs):
        if not response.ok:
            logger.error(response.text)
            raise requests.exceptions.HTTPError(response.text)

    @staticmethod
    def _update_last_status(self, response: Response, *args, **kwargs):
        self.last_request_status = response.status_code

    @staticmethod
    def _refresh_on_expire(response: Response, *args, **kwargs):
        raise NotImplemented

    def __init__(self):
        self.session = GfySession()
        self.session.hooks.update(
            {
                "response": [
                    self._raise_if_not_ok,
                    lambda *args, **kwargs: self._update_last_status(self, *args, **kwargs),
                ]
            }
        )

    @staticmethod
    def transform_content_url_key(gfy_item: dict) -> dict:
        """
        Transform keys that start with numbers so that they start with alphabetical characters instead.

        Args:
            gfy_item (dict): A pre-GfyItem response from the API.

        Returns:
            dict: A pre-GfyItem dict with fixed content URL subkeys.
        """
        return {
            **gfy_item,
            "content_urls": {
                **gfy_item["content_urls"],
                "gif100px": gfy_item["content_urls"]["100pxGif"],
            },
        }

    @staticmethod
    def transform_dashed_key(node: dict):
        for k, v in node.items():
            if "-" in k:
                undashed: List[str] = k.split("-")
                undashed = [u.capitalize() for i, u in enumerate(undashed) if i > 0]
                new_k = "".join(undashed)
                node[new_k] = v
                del node[k]
        return node

    def auth(
        self,
        *,
        client_id: str,
        client_secret: str,
        username: str = "",
        password: str = "",
        grant_type: Literal[
            "client_credentials", "password", "refresh", "convert_code", "provider_token"
        ] = "client_credentials",
    ):
        data = {
            "grant_type": grant_type,
            "client_id": client_id,
            "client_secret": client_secret,
        }
        if grant_type == "client_credentials":
            pass
        elif grant_type == "password":
            if not username:
                raise ValueError("Username is blank")
            if not password:
                raise ValueError("Password is blank")
            data.update({"username": username, "password": password})
        elif grant_type == "refresh":
            data.update({"refresh_token": self.credentials["refresh_token"]})
        else:
            raise NotImplemented
        self.client_id = client_id
        self.client_secret = client_secret

        res = self.session.post(f"{BASE_URL}/oauth/token", data=data)
        data = res.json()
        self.credentials = data
        self.session.headers.update(
            {"Authorization": f"{self.credentials['token_type']} {self.credentials['access_token']}"}
        )

        if grant_type == "refresh":
            logger.info("Refreshed token.")
        else:
            logger.info("Logged in.")

    def get_gfycat(self, gfy_id: str):
        res = self.session.get(f"{BASE_URL}/gfycats/{gfy_id}")
        data = res.json()
        return GfyItem(**self.transform_content_url_key(data["gfyItem"]))

    def new_gfycat_from_url(self, url: AnyHttpUrl):
        raise NotImplemented

    def new_gfycat_from_file(
        self,
        file: Union[str, PathLike],
        *,
        title: str = None,
        description: str = None,
        tags: List[str] = None,
        nsfw: bool = False,
        ignore_md5_check: bool = False,
        keep_audio: True,
        private: False,
    ) -> str:
        session = self.session
        file = Path(file).resolve()
        data = {
            "title": file.name if title is None else title,
            "nsfw": nsfw,
            "keepAudio": keep_audio,
            "noMd5": ignore_md5_check,
            "private": private,
        }
        if description is not None:
            data["description"] = description
        if tags is not None:
            data["tags"] = tags
        retries = Retry(total=100, backoff_factor=0.1)
        session.mount("https://", adapter=HTTPAdapter(max_retries=retries))
        res = session.post(f"{BASE_URL}/gfycats", data=data)
        metadata = res.json()
        with open(file, "rb") as f:
            new_file = io.BytesIO(f.read())
            new_file.name = metadata["gfyname"]
            new_file.seek(0)
            m = MultipartEncoder(
                fields={
                    "key": metadata["gfyname"],
                    "file": (metadata["gfyname"], new_file, "video/mp4"),
                }
            )
            res = requests.post("https://filedrop.gfycat.com", data=m, headers={"Content-Type": m.content_type})
        if res.status_code >= 400:
            raise requests.exceptions.HTTPError(res.text)
        return metadata["gfyname"]

    def check_upload_status(self, gfy_name: str):
        raise NotImplemented

    def update_gfycat(self, gfy_id: str, attribute: str, value: Any):
        self.session.put(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}", data={"value": value})
        logger.info(f'Updated Gfycat {gfy_id} {attribute} to "{value}"')

    def delete_gfycat(self, gfy_id: str):
        self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}")
        logger.info(f"Deleted Gfycat {gfy_id}")

    def delete_gfycat_attribute(self, gfy_id: str, attribute: str):
        self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}")
        logger.info(f"Deleted Gfycat {gfy_id} {attribute}")

    def get_user_feed(self, username: str, cursor: str = None, count: int = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        if count is not None:
            params["count"] = count
        res = self.session.get(f"{BASE_URL}/users/{username}/gfycats", params=params)
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
        }

    def get_my_feed(self, cursor: str = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        res = self.session.get(f"{BASE_URL}/me/gfycats", params=params)
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
        }

    def get_followers_feed(self, cursor: str = None):
        params = {}
        if cursor is not None:
            params["cursor"] = cursor
        res = self.session.get(f"{BASE_URL}/me/follows/gfycats")
        data = res.json()
        next_cursor: str = data["cursor"]
        return {
            "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
            "cursor": next_cursor,
            "count": data["count"],
            "total_count": data["totalCount"],
        }

    def get_my_albums(self):
        res = self.session.get(f"{BASE_URL}/me/album-folders")
        data = res.json()
        return [AlbumNode(**node) for node in data[0]["nodes"]]

    def create_album(self, new_id: str, title: str, description: str = "", contents: List[str] = None):
        if contents is None:
            contents = []
        self.session.post(
            f"{BASE_URL}/me/album-folders/{new_id}",
            data=dict(
                title=title,
                description=description,
                contents=contents,
            ),
        )
        logger.info(f'Created album "{title}"')

Class variables

var client_id : str
var client_secret : str
var credentials : Dict[str, Union[str, int]]
var last_request_status : int
var password : str
var username : str

Static methods

def transform_content_url_key(gfy_item: dict) ‑> dict

Transform keys that start with numbers so that they start with alphabetical characters instead.

Args

gfy_item : dict
A pre-GfyItem response from the API.

Returns

dict
A pre-GfyItem dict with fixed content URL subkeys.
Expand source code
@staticmethod
def transform_content_url_key(gfy_item: dict) -> dict:
    """
    Transform keys that start with numbers so that they start with alphabetical characters instead.

    Args:
        gfy_item (dict): A pre-GfyItem response from the API.

    Returns:
        dict: A pre-GfyItem dict with fixed content URL subkeys.
    """
    return {
        **gfy_item,
        "content_urls": {
            **gfy_item["content_urls"],
            "gif100px": gfy_item["content_urls"]["100pxGif"],
        },
    }
def transform_dashed_key(node: dict)
Expand source code
@staticmethod
def transform_dashed_key(node: dict):
    for k, v in node.items():
        if "-" in k:
            undashed: List[str] = k.split("-")
            undashed = [u.capitalize() for i, u in enumerate(undashed) if i > 0]
            new_k = "".join(undashed)
            node[new_k] = v
            del node[k]
    return node

Methods

def auth(self, *, client_id: str, client_secret: str, username: str = '', password: str = '', grant_type: Literal['client_credentials', 'password', 'refresh', 'convert_code', 'provider_token'] = 'client_credentials')
Expand source code
def auth(
    self,
    *,
    client_id: str,
    client_secret: str,
    username: str = "",
    password: str = "",
    grant_type: Literal[
        "client_credentials", "password", "refresh", "convert_code", "provider_token"
    ] = "client_credentials",
):
    data = {
        "grant_type": grant_type,
        "client_id": client_id,
        "client_secret": client_secret,
    }
    if grant_type == "client_credentials":
        pass
    elif grant_type == "password":
        if not username:
            raise ValueError("Username is blank")
        if not password:
            raise ValueError("Password is blank")
        data.update({"username": username, "password": password})
    elif grant_type == "refresh":
        data.update({"refresh_token": self.credentials["refresh_token"]})
    else:
        raise NotImplemented
    self.client_id = client_id
    self.client_secret = client_secret

    res = self.session.post(f"{BASE_URL}/oauth/token", data=data)
    data = res.json()
    self.credentials = data
    self.session.headers.update(
        {"Authorization": f"{self.credentials['token_type']} {self.credentials['access_token']}"}
    )

    if grant_type == "refresh":
        logger.info("Refreshed token.")
    else:
        logger.info("Logged in.")
def check_upload_status(self, gfy_name: str)
Expand source code
def check_upload_status(self, gfy_name: str):
    raise NotImplemented
def create_album(self, new_id: str, title: str, description: str = '', contents: List[str] = None)
Expand source code
def create_album(self, new_id: str, title: str, description: str = "", contents: List[str] = None):
    if contents is None:
        contents = []
    self.session.post(
        f"{BASE_URL}/me/album-folders/{new_id}",
        data=dict(
            title=title,
            description=description,
            contents=contents,
        ),
    )
    logger.info(f'Created album "{title}"')
def delete_gfycat(self, gfy_id: str)
Expand source code
def delete_gfycat(self, gfy_id: str):
    self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}")
    logger.info(f"Deleted Gfycat {gfy_id}")
def delete_gfycat_attribute(self, gfy_id: str, attribute: str)
Expand source code
def delete_gfycat_attribute(self, gfy_id: str, attribute: str):
    self.session.delete(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}")
    logger.info(f"Deleted Gfycat {gfy_id} {attribute}")
def get_followers_feed(self, cursor: str = None)
Expand source code
def get_followers_feed(self, cursor: str = None):
    params = {}
    if cursor is not None:
        params["cursor"] = cursor
    res = self.session.get(f"{BASE_URL}/me/follows/gfycats")
    data = res.json()
    next_cursor: str = data["cursor"]
    return {
        "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
        "cursor": next_cursor,
        "count": data["count"],
        "total_count": data["totalCount"],
    }
def get_gfycat(self, gfy_id: str)
Expand source code
def get_gfycat(self, gfy_id: str):
    res = self.session.get(f"{BASE_URL}/gfycats/{gfy_id}")
    data = res.json()
    return GfyItem(**self.transform_content_url_key(data["gfyItem"]))
def get_my_albums(self)
Expand source code
def get_my_albums(self):
    res = self.session.get(f"{BASE_URL}/me/album-folders")
    data = res.json()
    return [AlbumNode(**node) for node in data[0]["nodes"]]
def get_my_feed(self, cursor: str = None)
Expand source code
def get_my_feed(self, cursor: str = None):
    params = {}
    if cursor is not None:
        params["cursor"] = cursor
    res = self.session.get(f"{BASE_URL}/me/gfycats", params=params)
    data = res.json()
    next_cursor: str = data["cursor"]
    return {
        "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
        "cursor": next_cursor,
    }
def get_user_feed(self, username: str, cursor: str = None, count: int = None)
Expand source code
def get_user_feed(self, username: str, cursor: str = None, count: int = None):
    params = {}
    if cursor is not None:
        params["cursor"] = cursor
    if count is not None:
        params["count"] = count
    res = self.session.get(f"{BASE_URL}/users/{username}/gfycats", params=params)
    data = res.json()
    next_cursor: str = data["cursor"]
    return {
        "gfycats": [GfyItem(**self.transform_content_url_key(d)) for d in data["gfycats"]],
        "cursor": next_cursor,
    }
def new_gfycat_from_file(self, file: Union[str, os.PathLike], *, title: str = None, description: str = None, tags: List[str] = None, nsfw: bool = False, ignore_md5_check: bool = False, keep_audio: True, private: False) ‑> str
Expand source code
def new_gfycat_from_file(
    self,
    file: Union[str, PathLike],
    *,
    title: str = None,
    description: str = None,
    tags: List[str] = None,
    nsfw: bool = False,
    ignore_md5_check: bool = False,
    keep_audio: True,
    private: False,
) -> str:
    session = self.session
    file = Path(file).resolve()
    data = {
        "title": file.name if title is None else title,
        "nsfw": nsfw,
        "keepAudio": keep_audio,
        "noMd5": ignore_md5_check,
        "private": private,
    }
    if description is not None:
        data["description"] = description
    if tags is not None:
        data["tags"] = tags
    retries = Retry(total=100, backoff_factor=0.1)
    session.mount("https://", adapter=HTTPAdapter(max_retries=retries))
    res = session.post(f"{BASE_URL}/gfycats", data=data)
    metadata = res.json()
    with open(file, "rb") as f:
        new_file = io.BytesIO(f.read())
        new_file.name = metadata["gfyname"]
        new_file.seek(0)
        m = MultipartEncoder(
            fields={
                "key": metadata["gfyname"],
                "file": (metadata["gfyname"], new_file, "video/mp4"),
            }
        )
        res = requests.post("https://filedrop.gfycat.com", data=m, headers={"Content-Type": m.content_type})
    if res.status_code >= 400:
        raise requests.exceptions.HTTPError(res.text)
    return metadata["gfyname"]
def new_gfycat_from_url(self, url: pydantic.networks.AnyHttpUrl)
Expand source code
def new_gfycat_from_url(self, url: AnyHttpUrl):
    raise NotImplemented
def update_gfycat(self, gfy_id: str, attribute: str, value: Any)
Expand source code
def update_gfycat(self, gfy_id: str, attribute: str, value: Any):
    self.session.put(f"{BASE_URL}/me/gfycats/{gfy_id}/{attribute}", data={"value": value})
    logger.info(f'Updated Gfycat {gfy_id} {attribute} to "{value}"')