""" TogglPy is a non-cluttered, easily understood and implemented library for interacting with the Toggl API. """ import json # parsing json data import math import sys import time from base64 import b64encode from datetime import datetime # for making requests # backward compatibility with python2 cafile = None if sys.version[0] == "2": from urllib import urlencode from urllib2 import urlopen, Request else: from urllib.parse import urlencode from urllib.request import urlopen, Request try: import certifi cafile = certifi.where() except ImportError: pass # -------------------------------------------- # Class containing the endpoint URLs for Toggl # -------------------------------------------- class Endpoints: WORKSPACES = "https://api.track.toggl.com/api/v8/workspaces" CLIENTS = "https://api.track.toggl.com/api/v8/clients" PROJECTS = "https://api.track.toggl.com/api/v8/projects" TASKS = "https://api.track.toggl.com/api/v8/tasks" REPORT_WEEKLY = "https://toggl.com/reports/api/v2/weekly" REPORT_DETAILED = "https://toggl.com/reports/api/v2/details" REPORT_SUMMARY = "https://toggl.com/reports/api/v2/summary" START_TIME = "https://api.track.toggl.com/api/v8/time_entries/start" TIME_ENTRIES = "https://api.track.toggl.com/api/v8/time_entries" CURRENT_RUNNING_TIME = "https://api.track.toggl.com/api/v8/time_entries/current" @staticmethod def STOP_TIME(pid): return "https://api.track.toggl.com/api/v8/time_entries/" + str(pid) + "/stop" # ------------------------------------------------------ # Class containing the necessities for Toggl interaction # ------------------------------------------------------ class Toggl: # template of headers for our request headers = { "Authorization": "", "Content-Type": "application/json", "Accept": "*/*", "User-Agent": "python/urllib", } # default API user agent value user_agent = "TogglPy" # ------------------------------------------------------------ # Auxiliary methods # ------------------------------------------------------------ def decodeJSON(self, jsonString): return json.JSONDecoder().decode(jsonString) # ------------------------------------------------------------ # Methods that modify the headers to control our HTTP requests # ------------------------------------------------------------ def setAPIKey(self, APIKey): """set the API key in the request header""" # craft the Authorization authHeader = APIKey + ":" + "api_token" authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip() # add it into the header self.headers["Authorization"] = authHeader def setAuthCredentials(self, email, password): authHeader = "{0}:{1}".format(email, password) authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip() # add it into the header self.headers["Authorization"] = authHeader def setUserAgent(self, agent): """set the User-Agent setting, by default it's set to TogglPy""" self.user_agent = agent # ----------------------------------------------------- # Methods for directly requesting data from an endpoint # ----------------------------------------------------- def requestRaw(self, endpoint, parameters=None): """make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)""" if parameters is None: return urlopen( Request(endpoint, headers=self.headers), cafile=cafile ).read() else: if "user_agent" not in parameters: parameters.update( {"user_agent": self.user_agent} ) # add our class-level user agent in there # encode all of our data for a get request & modify the URL endpoint = endpoint + "?" + urlencode(parameters) # make request and read the response return urlopen( Request(endpoint, headers=self.headers), cafile=cafile ).read() def request(self, endpoint, parameters=None): """make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict""" return json.loads(self.requestRaw(endpoint, parameters).decode("utf-8")) def postRequest(self, endpoint, parameters=None, method="POST"): """make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)""" if ( method == "DELETE" ): # Calls to the API using the DELETE mothod return a HTTP response rather than JSON return urlopen( Request(endpoint, headers=self.headers, method=method), cafile=cafile ).code if parameters is None: return ( urlopen( Request(endpoint, headers=self.headers, method=method), cafile=cafile, ) .read() .decode("utf-8") ) else: data = json.JSONEncoder().encode(parameters) binary_data = data.encode("utf-8") # make request and read the response return ( urlopen( Request( endpoint, data=binary_data, headers=self.headers, method=method ), cafile=cafile, ) .read() .decode("utf-8") ) # --------------------------------- # Methods for managing Time Entries # --------------------------------- def startTimeEntry(self, description, pid=None, tid=None, tags=None): """starts a new Time Entry""" data = { "time_entry": {"created_with": self.user_agent, "description": description} } if pid: data["time_entry"]["pid"] = pid if tid: data["time_entry"]["tid"] = tid if tags: data["time_entry"]["tags"] = tags response = self.postRequest(Endpoints.START_TIME, parameters=data) return self.decodeJSON(response) def currentRunningTimeEntry(self): """Gets the Current Time Entry""" return self.request(Endpoints.CURRENT_RUNNING_TIME) def stopTimeEntry(self, entryid): """Stop the time entry""" response = self.postRequest(Endpoints.STOP_TIME(entryid), method="PUT") return self.decodeJSON(response) def createTimeEntry( self, hourduration, description=None, projectid=None, projectname=None, taskid=None, clientname=None, year=None, month=None, day=None, hour=None, billable=False, hourdiff=-2, ): """ Creating a custom time entry, minimum must is hour duration and project param :param hourduration: :param description: Sets a descripton for the newly created time entry :param projectid: Not required if projectname given :param projectname: Not required if projectid was given :param taskid: Adds a task to the time entry (Requirement: Toggl Starter or higher) :param clientname: Can speed up project query process :param year: Taken from now() if not provided :param month: Taken from now() if not provided :param day: Taken from now() if not provided :param hour: Taken from now() if not provided :return: response object from post call """ data = {"time_entry": {}} if not projectid: if projectname and clientname: projectid = (self.getClientProject(clientname, projectname))["data"][ "id" ] elif projectname: projectid = (self.searchClientProject(projectname))["data"]["id"] else: print("Too many missing parameters for query") exit(1) if description: data["time_entry"]["description"] = description if taskid: data["time_entry"]["tid"] = taskid year = datetime.now().year if not year else year month = datetime.now().month if not month else month day = datetime.now().day if not day else day hour = datetime.now().hour if not hour else hour timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + ".000Z" data["time_entry"]["start"] = timestruct data["time_entry"]["duration"] = hourduration * 3600 data["time_entry"]["pid"] = projectid data["time_entry"]["created_with"] = "NAME" data["time_entry"]["billable"] = billable response = self.postRequest(Endpoints.TIME_ENTRIES, parameters=data) return self.decodeJSON(response) def putTimeEntry(self, parameters): if "id" not in parameters: raise Exception("An id must be provided in order to put a time entry") id = parameters["id"] if type(id) is not int: raise Exception("Invalid id %s provided " % (id)) endpoint = ( Endpoints.TIME_ENTRIES + "/" + str(id) ) # encode all of our data for a put request & modify the URL data = json.JSONEncoder().encode({"time_entry": parameters}) request = Request(endpoint, data=data, headers=self.headers) request.get_method = lambda: "PUT" return json.loads(urlopen(request).read()) def deleteTimeEntry(self, entryid): """ Delete the specified timeEntry :param entryid: The id of the entry to delete """ response = self.postRequest( Endpoints.TIME_ENTRIES + "/{0}".format(entryid), method="DELETE" ) return response # ---------------------------------- # Methods for getting workspace data # ---------------------------------- def getWorkspaces(self): """return all the workspaces for a user""" return self.request(Endpoints.WORKSPACES) def getWorkspace(self, name=None, id=None): """return the first workspace that matches a given name or id""" workspaces = self.getWorkspaces() # get all workspaces # if they give us nothing let them know we're not returning anything if name is None and id is None: print( "Error in getWorkspace(), please enter either a name or an id as a filter" ) return None if id is None: # then we search by name for ( workspace ) in workspaces: # search through them for one matching the name provided if workspace["name"] == name: return workspace # if we find it return it return None # if we get to here and haven't found it return None else: # otherwise search by id for ( workspace ) in workspaces: # search through them for one matching the id provided if workspace["id"] == int(id): return workspace # if we find it return it return None # if we get to here and haven't found it return None def getWorkspaceProjects(self, id): """ Return all of the projects for a given Workspace :param id: Workspace ID by which to query :return: Projects object returned from endpoint """ return self.request(Endpoints.WORKSPACES + "/{0}".format(id) + "/projects") # ------------------------------- # Methods for getting client data # ------------------------------- def getClients(self): """return all clients that are visable to a user""" return self.request(Endpoints.CLIENTS) def getClient(self, name=None, id=None): """return the first workspace that matches a given name or id""" clients = self.getClients() # get all clients # if they give us nothing let them know we're not returning anything if name is None and id is None: print( "Error in getClient(), please enter either a name or an id as a filter" ) return None if id is None: # then we search by name for ( client ) in clients: # search through them for one matching the name provided if client["name"] == name: return client # if we find it return it return None # if we get to here and haven't found it return None else: # otherwise search by id for ( client ) in clients: # search through them for one matching the id provided if client["id"] == int(id): return client # if we find it return it return None # if we get to here and haven't found it return None def getClientProjects(self, id, active="true"): """ :param id: Client ID by which to query :param active: possible values true/false/both. By default true. If false, only archived projects are returned. :return: Projects object returned from endpoint """ return self.request( Endpoints.CLIENTS + "/{0}/projects?active={1}".format(id, active) ) def searchClientProject(self, name): """ Provide only a projects name for query and search through entire available names WARNING: Takes a long time! If client name is known, 'getClientProject' would be advised :param name: Desired Project's name :return: Project object """ for client in self.getClients(): try: for project in self.getClientProjects(client["id"]): if project["name"] == name: return project except Exception: continue print("Could not find client by the name") return None def getClientProject(self, clientName, projectName): """ Fast query given the Client's name and Project's name :param clientName: :param projectName: :return: """ for client in self.getClients(): if client["name"] == clientName: cid = client["id"] if not cid: print("Could not find such client name") return None for projct in self.getClientProjects(cid): if projct["name"] == projectName: pid = projct["id"] if not pid: print("Could not find such project name") return None return self.getProject(pid) # -------------------------------- # Methods for getting PROJECTS data # -------------------------------- def getProject(self, pid): """return all projects that are visable to a user""" return self.request(Endpoints.PROJECTS + "/{0}".format(pid)) def getProjectTasks(self, pid, archived=False): """ return all tasks of a given project :param pid: Project ID :param archived: choose wether to fetch archived tasks or not """ return self.request(Endpoints.PROJECTS + "/{0}".format(pid) + "/tasks") # -------------------------------- # Methods for interacting with TASKS data # -------------------------------- def createTask(self, name, pid, active=True, estimatedSeconds=False): """ create a new task (Requirement: Toggl Starter or higher) :param name: Name of the task :param pid: Project ID :param active: Defines if the task is active or archived, default: active :param estimatedSeconds: Estimation for the task in seconds """ data = {} data["task"] = {} data["task"]["name"] = name data["task"]["pid"] = pid data["task"]["active"] = active data["task"]["estimated_seconds"] = estimatedSeconds response = self.postRequest(Endpoints.TASKS, parameters=data) return self.decodeJSON(response) # -------------------------------- # Methods for getting reports data # --------------------------------- def getWeeklyReport(self, data): """return a weekly report for a user""" return self.request(Endpoints.REPORT_WEEKLY, parameters=data) def getWeeklyReportPDF(self, data, filename): """save a weekly report as a PDF""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_WEEKLY + ".pdf", parameters=data) # write the data to a file with open(filename, "wb") as pdf: pdf.write(filedata) def getDetailedReport(self, data): """return a detailed report for a user""" return self.request(Endpoints.REPORT_DETAILED, parameters=data) def getDetailedReportPages(self, data): """return detailed report data from all pages for a user""" pages_index = 1 data["page"] = pages_index pages = self.request(Endpoints.REPORT_DETAILED, parameters=data) try: pages_number = math.ceil( pages.get("total_count", 0) / pages.get("per_page", 0) ) except ZeroDivisionError: pages_number = 0 for pages_index in range(2, pages_number + 1): time.sleep( 1 ) # There is rate limiting of 1 request per second (per IP per API token). data["page"] = pages_index pages["data"].extend( self.request(Endpoints.REPORT_DETAILED, parameters=data).get("data", []) ) return pages def getDetailedReportPDF(self, data, filename): """save a detailed report as a pdf""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".pdf", parameters=data) # write the data to a file with open(filename, "wb") as pdf: pdf.write(filedata) def getDetailedReportCSV(self, data, filename=None): """save a detailed report as a csv""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".csv", parameters=data) if filename: # write the data to a file with open(filename, "wb") as pdf: pdf.write(filedata) else: return filedata def getSummaryReport(self, data): """return a summary report for a user""" return self.request(Endpoints.REPORT_SUMMARY, parameters=data) def getSummaryReportPDF(self, data, filename): """save a summary report as a pdf""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_SUMMARY + ".pdf", parameters=data) # write the data to a file with open(filename, "wb") as pdf: pdf.write(filedata) # -------------------------------- # Methods for creating, updating, and deleting clients # --------------------------------- def createClient(self, name, wid, notes=None): """ create a new client :param name: Name the client :param wid: Workspace ID :param notes: Notes for the client (optional) """ data = {} data["client"] = {} data["client"]["name"] = name data["client"]["wid"] = wid data["client"]["notes"] = notes response = self.postRequest(Endpoints.CLIENTS, parameters=data) return self.decodeJSON(response) def updateClient(self, id, name=None, notes=None): """ Update data for an existing client. If the name or notes parameter is not supplied, the existing data on the Toggl server will not be changed. :param id: The id of the client to update :param name: Update the name of the client (optional) :param notes: Update the notes for the client (optional) """ data = {} data["client"] = {} data["client"]["name"] = name data["client"]["notes"] = notes response = self.postRequest( Endpoints.CLIENTS + "/{0}".format(id), parameters=data, method="PUT" ) return self.decodeJSON(response) def deleteClient(self, id): """ Delete the specified client :param id: The id of the client to delete """ response = self.postRequest( Endpoints.CLIENTS + "/{0}".format(id), method="DELETE" ) return response