diff --git a/Mapping.py b/Mapping.py index 7d8eefb..68f63a3 100644 --- a/Mapping.py +++ b/Mapping.py @@ -1,8 +1,6 @@ - class Mapping(object): - def __init__(self, side: int, id: int): self.side = side self.id = id self.description = "" - self.tags = [] \ No newline at end of file + self.tags = [] diff --git a/TogglDelegate.py b/TogglDelegate.py index c190001..076618a 100644 --- a/TogglDelegate.py +++ b/TogglDelegate.py @@ -11,47 +11,55 @@ _log = logging.getLogger(__name__) _log.addHandler(logging.StreamHandler()) _log.setLevel(logging.INFO) -class TogglDelegate(ZeiDelegate): +class TogglDelegate(ZeiDelegate): def __init__(self, periph, config): self.config = config self.toggl = Toggl() - self.toggl.setAPIKey(self.config['toggl']['settings']['token']) + self.toggl.setAPIKey(self.config["toggl"]["settings"]["token"]) self._populateProjects() - self._populateMappings(self.config['mappings']) + self._populateMappings(self.config["mappings"]) super().__init__(periph) def handleNotification(self, cHandle, data): - if cHandle == 38: # Side Change Notification - side = struct.unpack('B', data)[0] + if cHandle == 38: # Side Change Notification + side = struct.unpack("B", data)[0] self._trackProjectByMapping(self.mappings[side]) else: _log.info("Notification from hndl: %s - %r", cHandle, data) def _trackProjectByMapping(self, mapping: Mapping): - self._trackProject(description=mapping.description, pid=mapping.id, tags=mapping.tags) + self._trackProject( + description=mapping.description, pid=mapping.id, tags=mapping.tags + ) def _trackProject(self, description: str, pid: int, tags: list): - current = self.toggl.currentRunningTimeEntry()['data'] + current = self.toggl.currentRunningTimeEntry()["data"] if current is not None: - if (datetime.now(timezone.utc) - dateutil.parser.isoparse(current['start'])).total_seconds() < 20: + if ( + datetime.now(timezone.utc) - dateutil.parser.isoparse(current["start"]) + ).total_seconds() < 20: # Delete entry if not older than 20s _log.info("Abort currently running entry") - self.toggl.deleteTimeEntry(current['id']) + self.toggl.deleteTimeEntry(current["id"]) else: _log.info("Stopping currently running entry") - self.toggl.stopTimeEntry(current['id']) + self.toggl.stopTimeEntry(current["id"]) - - _log.info("Now tracking project %s: %s (%s)", self.projects[pid]['name'], description, ', '.join(tags if tags else [])) + _log.info( + "Now tracking project %s: %s (%s)", + self.projects[pid]["name"], + description, + ", ".join(tags if tags else []), + ) if pid == 0: return - + self.toggl.startTimeEntry(description, pid=pid, tags=tags) - + def _populateMappings(self, mappings: dict): - self.mappings = { 0: Mapping(0, 0)} + self.mappings = {0: Mapping(0, 0)} for i in mappings: self.mappings[int(i)] = Mapping(int(i), int(mappings[i]["id"])) if "description" in mappings[i]: @@ -61,9 +69,25 @@ class TogglDelegate(ZeiDelegate): def _populateProjects(self): self.projects = {} - proj = (self.toggl.getWorkspaceProjects(self.config['toggl']['settings']['workspace_id'])) - NoneProj = {'id': 0, 'wid': int(self.config['toggl']['settings']['workspace_id']), 'name': 'None', 'billable': False, 'is_private': True, 'active': True, 'template': False, 'at': '2020-06-09T04:02:38+00:00', 'created_at': '2019-12-09T16:36:28+00:00', 'color': '9', 'auto_estimates': False, 'actual_hours': 0, 'hex_color': '#990099'} + proj = self.toggl.getWorkspaceProjects( + self.config["toggl"]["settings"]["workspace_id"] + ) + NoneProj = { + "id": 0, + "wid": int(self.config["toggl"]["settings"]["workspace_id"]), + "name": "None", + "billable": False, + "is_private": True, + "active": True, + "template": False, + "at": "2020-06-09T04:02:38+00:00", + "created_at": "2019-12-09T16:36:28+00:00", + "color": "9", + "auto_estimates": False, + "actual_hours": 0, + "hex_color": "#990099", + } self.projects[0] = NoneProj for i in proj: - self.projects[i['id']] = i \ No newline at end of file + self.projects[i["id"]] = i diff --git a/main.py b/main.py index aa7905d..f447e71 100755 --- a/main.py +++ b/main.py @@ -12,16 +12,17 @@ _log = logging.getLogger(__name__) _log.addHandler(logging.StreamHandler()) _log.setLevel(logging.INFO) + def main(): config = json.load(open("./config.json", "r")) - zei = Zei(config["zei"]["mac"], 'random', iface=0) + zei = Zei(config["zei"]["mac"], "random", iface=0) zei.withDelegate(TogglDelegate(zei, config)) scanner = ZeiDiscovery(zei) while True: try: - zei.waitForNotifications(timeout=None) + zei.waitForNotifications(timeout=None) except Exception as e: _log.exception(e) scanner.reconnect() diff --git a/toggl/TogglPy.py b/toggl/TogglPy.py index ddc7afb..f203ca9 100644 --- a/toggl/TogglPy.py +++ b/toggl/TogglPy.py @@ -18,8 +18,10 @@ if sys.version[0] == "2": else: from urllib.parse import urlencode from urllib.request import urlopen, Request + try: import certifi + cafile = certifi.where() except ImportError: pass @@ -28,7 +30,7 @@ else: # -------------------------------------------- # Class containing the endpoint URLs for Toggl # -------------------------------------------- -class Endpoints(): +class Endpoints: WORKSPACES = "https://www.toggl.com/api/v8/workspaces" CLIENTS = "https://www.toggl.com/api/v8/clients" PROJECTS = "https://www.toggl.com/api/v8/projects" @@ -48,7 +50,7 @@ class Endpoints(): # ------------------------------------------------------ # Class containing the necessities for Toggl interaction # ------------------------------------------------------ -class Toggl(): +class Toggl: # template of headers for our request headers = { "Authorization": "", @@ -71,23 +73,23 @@ class Toggl(): # Methods that modify the headers to control our HTTP requests # ------------------------------------------------------------ def setAPIKey(self, APIKey): - '''set the API key in the request header''' + """set the API key in the request header""" # craft the Authorization authHeader = APIKey + ":" + "api_token" - authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip() + authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip() # add it into the header - self.headers['Authorization'] = authHeader + self.headers["Authorization"] = authHeader def setAuthCredentials(self, email, password): - authHeader = '{0}:{1}'.format(email, password) - authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip() + authHeader = "{0}:{1}".format(email, password) + authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip() # add it into the header - self.headers['Authorization'] = authHeader + self.headers["Authorization"] = authHeader def setUserAgent(self, agent): - '''set the User-Agent setting, by default it's set to TogglPy''' + """set the User-Agent setting, by default it's set to TogglPy""" self.user_agent = agent # ----------------------------------------------------- @@ -95,47 +97,68 @@ class Toggl(): # ----------------------------------------------------- def requestRaw(self, endpoint, parameters=None): - '''make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)''' + """make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)""" if parameters is None: - return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read() + return urlopen( + Request(endpoint, headers=self.headers), cafile=cafile + ).read() else: - if 'user_agent' not in parameters: - parameters.update({'user_agent': self.user_agent}) # add our class-level user agent in there + if "user_agent" not in parameters: + parameters.update( + {"user_agent": self.user_agent} + ) # add our class-level user agent in there # encode all of our data for a get request & modify the URL endpoint = endpoint + "?" + urlencode(parameters) # make request and read the response - return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read() + return urlopen( + Request(endpoint, headers=self.headers), cafile=cafile + ).read() def request(self, endpoint, parameters=None): - '''make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict''' - return json.loads(self.requestRaw(endpoint, parameters).decode('utf-8')) + """make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict""" + return json.loads(self.requestRaw(endpoint, parameters).decode("utf-8")) - def postRequest(self, endpoint, parameters=None, method='POST'): - '''make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)''' - if method == 'DELETE': # Calls to the API using the DELETE mothod return a HTTP response rather than JSON - return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).code + def postRequest(self, endpoint, parameters=None, method="POST"): + """make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)""" + if ( + method == "DELETE" + ): # Calls to the API using the DELETE mothod return a HTTP response rather than JSON + return urlopen( + Request(endpoint, headers=self.headers, method=method), cafile=cafile + ).code if parameters is None: - return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).read().decode('utf-8') + return ( + urlopen( + Request(endpoint, headers=self.headers, method=method), + cafile=cafile, + ) + .read() + .decode("utf-8") + ) else: data = json.JSONEncoder().encode(parameters) - binary_data = data.encode('utf-8') + binary_data = data.encode("utf-8") # make request and read the response - return urlopen( - Request(endpoint, data=binary_data, headers=self.headers, method=method), cafile=cafile - ).read().decode('utf-8') + return ( + urlopen( + Request( + endpoint, data=binary_data, headers=self.headers, method=method + ), + cafile=cafile, + ) + .read() + .decode("utf-8") + ) # --------------------------------- # Methods for managing Time Entries # --------------------------------- def startTimeEntry(self, description, pid=None, tid=None, tags=None): - '''starts a new Time Entry''' + """starts a new Time Entry""" data = { - "time_entry": { - "created_with": self.user_agent, - "description": description - } + "time_entry": {"created_with": self.user_agent, "description": description} } if pid: data["time_entry"]["pid"] = pid @@ -145,22 +168,33 @@ class Toggl(): if tags: data["time_entry"]["tags"] = tags - response = self.postRequest(Endpoints.START_TIME, parameters=data) return self.decodeJSON(response) def currentRunningTimeEntry(self): - '''Gets the Current Time Entry''' + """Gets the Current Time Entry""" return self.request(Endpoints.CURRENT_RUNNING_TIME) def stopTimeEntry(self, entryid): - '''Stop the time entry''' - response = self.postRequest(Endpoints.STOP_TIME(entryid), method='PUT') + """Stop the time entry""" + response = self.postRequest(Endpoints.STOP_TIME(entryid), method="PUT") return self.decodeJSON(response) - def createTimeEntry(self, hourduration, description=None, projectid=None, projectname=None, - taskid=None, clientname=None, year=None, month=None, day=None, hour=None, - billable=False, hourdiff=-2): + def createTimeEntry( + self, + hourduration, + description=None, + projectid=None, + projectname=None, + taskid=None, + clientname=None, + year=None, + month=None, + day=None, + hour=None, + billable=False, + hourdiff=-2, + ): """ Creating a custom time entry, minimum must is hour duration and project param :param hourduration: @@ -175,87 +209,95 @@ class Toggl(): :param hour: Taken from now() if not provided :return: response object from post call """ - data = { - "time_entry": {} - } + data = {"time_entry": {}} if not projectid: if projectname and clientname: - projectid = (self.getClientProject(clientname, projectname))['data']['id'] + projectid = (self.getClientProject(clientname, projectname))["data"][ + "id" + ] elif projectname: - projectid = (self.searchClientProject(projectname))['data']['id'] + projectid = (self.searchClientProject(projectname))["data"]["id"] else: - print('Too many missing parameters for query') + print("Too many missing parameters for query") exit(1) if description: - data['time_entry']['description'] = description + data["time_entry"]["description"] = description if taskid: - data['time_entry']['tid'] = taskid + data["time_entry"]["tid"] = taskid year = datetime.now().year if not year else year month = datetime.now().month if not month else month day = datetime.now().day if not day else day hour = datetime.now().hour if not hour else hour - timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + '.000Z' - data['time_entry']['start'] = timestruct - data['time_entry']['duration'] = hourduration * 3600 - data['time_entry']['pid'] = projectid - data['time_entry']['created_with'] = 'NAME' - data['time_entry']['billable'] = billable + timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + ".000Z" + data["time_entry"]["start"] = timestruct + data["time_entry"]["duration"] = hourduration * 3600 + data["time_entry"]["pid"] = projectid + data["time_entry"]["created_with"] = "NAME" + data["time_entry"]["billable"] = billable response = self.postRequest(Endpoints.TIME_ENTRIES, parameters=data) return self.decodeJSON(response) def putTimeEntry(self, parameters): - if 'id' not in parameters: + if "id" not in parameters: raise Exception("An id must be provided in order to put a time entry") - id = parameters['id'] + id = parameters["id"] if type(id) is not int: raise Exception("Invalid id %s provided " % (id)) - endpoint = Endpoints.TIME_ENTRIES + "/" + str(id) # encode all of our data for a put request & modify the URL - data = json.JSONEncoder().encode({'time_entry': parameters}) + endpoint = ( + Endpoints.TIME_ENTRIES + "/" + str(id) + ) # encode all of our data for a put request & modify the URL + data = json.JSONEncoder().encode({"time_entry": parameters}) request = Request(endpoint, data=data, headers=self.headers) request.get_method = lambda: "PUT" return json.loads(urlopen(request).read()) - def deleteTimeEntry(self, entryid): """ Delete the specified timeEntry :param entryid: The id of the entry to delete """ - response = self.postRequest(Endpoints.TIME_ENTRIES + '/{0}'.format(entryid), method='DELETE') + response = self.postRequest( + Endpoints.TIME_ENTRIES + "/{0}".format(entryid), method="DELETE" + ) return response - # ---------------------------------- # Methods for getting workspace data # ---------------------------------- def getWorkspaces(self): - '''return all the workspaces for a user''' + """return all the workspaces for a user""" return self.request(Endpoints.WORKSPACES) def getWorkspace(self, name=None, id=None): - '''return the first workspace that matches a given name or id''' + """return the first workspace that matches a given name or id""" workspaces = self.getWorkspaces() # get all workspaces # if they give us nothing let them know we're not returning anything if name is None and id is None: - print("Error in getWorkspace(), please enter either a name or an id as a filter") + print( + "Error in getWorkspace(), please enter either a name or an id as a filter" + ) return None if id is None: # then we search by name - for workspace in workspaces: # search through them for one matching the name provided - if workspace['name'] == name: + for ( + workspace + ) in workspaces: # search through them for one matching the name provided + if workspace["name"] == name: return workspace # if we find it return it return None # if we get to here and haven't found it return None else: # otherwise search by id - for workspace in workspaces: # search through them for one matching the id provided - if workspace['id'] == int(id): + for ( + workspace + ) in workspaces: # search through them for one matching the id provided + if workspace["id"] == int(id): return workspace # if we find it return it return None # if we get to here and haven't found it return None @@ -266,43 +308,51 @@ class Toggl(): :return: Projects object returned from endpoint """ - return self.request(Endpoints.WORKSPACES + '/{0}'.format(id) + '/projects') + return self.request(Endpoints.WORKSPACES + "/{0}".format(id) + "/projects") # ------------------------------- # Methods for getting client data # ------------------------------- def getClients(self): - '''return all clients that are visable to a user''' + """return all clients that are visable to a user""" return self.request(Endpoints.CLIENTS) def getClient(self, name=None, id=None): - '''return the first workspace that matches a given name or id''' + """return the first workspace that matches a given name or id""" clients = self.getClients() # get all clients # if they give us nothing let them know we're not returning anything if name is None and id is None: - print("Error in getClient(), please enter either a name or an id as a filter") + print( + "Error in getClient(), please enter either a name or an id as a filter" + ) return None if id is None: # then we search by name - for client in clients: # search through them for one matching the name provided - if client['name'] == name: + for ( + client + ) in clients: # search through them for one matching the name provided + if client["name"] == name: return client # if we find it return it return None # if we get to here and haven't found it return None else: # otherwise search by id - for client in clients: # search through them for one matching the id provided - if client['id'] == int(id): + for ( + client + ) in clients: # search through them for one matching the id provided + if client["id"] == int(id): return client # if we find it return it return None # if we get to here and haven't found it return None - def getClientProjects(self, id, active='true'): + def getClientProjects(self, id, active="true"): """ :param id: Client ID by which to query :param active: possible values true/false/both. By default true. If false, only archived projects are returned. :return: Projects object returned from endpoint """ - return self.request(Endpoints.CLIENTS + '/{0}/projects?active={1}'.format(id, active)) + return self.request( + Endpoints.CLIENTS + "/{0}/projects?active={1}".format(id, active) + ) def searchClientProject(self, name): """ @@ -314,13 +364,13 @@ class Toggl(): """ for client in self.getClients(): try: - for project in self.getClientProjects(client['id']): - if project['name'] == name: + for project in self.getClientProjects(client["id"]): + if project["name"] == name: return project except Exception: continue - print('Could not find client by the name') + print("Could not find client by the name") return None def getClientProject(self, clientName, projectName): @@ -331,19 +381,19 @@ class Toggl(): :return: """ for client in self.getClients(): - if client['name'] == clientName: - cid = client['id'] + if client["name"] == clientName: + cid = client["id"] if not cid: - print('Could not find such client name') + print("Could not find such client name") return None for projct in self.getClientProjects(cid): - if projct['name'] == projectName: - pid = projct['id'] + if projct["name"] == projectName: + pid = projct["id"] if not pid: - print('Could not find such project name') + print("Could not find such project name") return None return self.getProject(pid) @@ -352,8 +402,8 @@ class Toggl(): # Methods for getting PROJECTS data # -------------------------------- def getProject(self, pid): - '''return all projects that are visable to a user''' - return self.request(Endpoints.PROJECTS + '/{0}'.format(pid)) + """return all projects that are visable to a user""" + return self.request(Endpoints.PROJECTS + "/{0}".format(pid)) def getProjectTasks(self, pid, archived=False): """ @@ -361,7 +411,7 @@ class Toggl(): :param pid: Project ID :param archived: choose wether to fetch archived tasks or not """ - return self.request(Endpoints.PROJECTS + '/{0}'.format(pid) + '/tasks') + return self.request(Endpoints.PROJECTS + "/{0}".format(pid) + "/tasks") # -------------------------------- # Methods for interacting with TASKS data @@ -377,11 +427,11 @@ class Toggl(): """ data = {} - data['task'] = {} - data['task']['name'] = name - data['task']['pid'] = pid - data['task']['active'] = active - data['task']['estimated_seconds'] = estimatedSeconds + data["task"] = {} + data["task"]["name"] = name + data["task"]["pid"] = pid + data["task"]["active"] = active + data["task"]["estimated_seconds"] = estimatedSeconds response = self.postRequest(Endpoints.TASKS, parameters=data) return self.decodeJSON(response) @@ -390,11 +440,11 @@ class Toggl(): # Methods for getting reports data # --------------------------------- def getWeeklyReport(self, data): - '''return a weekly report for a user''' + """return a weekly report for a user""" return self.request(Endpoints.REPORT_WEEKLY, parameters=data) def getWeeklyReportPDF(self, data, filename): - '''save a weekly report as a PDF''' + """save a weekly report as a PDF""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_WEEKLY + ".pdf", parameters=data) @@ -403,26 +453,32 @@ class Toggl(): pdf.write(filedata) def getDetailedReport(self, data): - '''return a detailed report for a user''' + """return a detailed report for a user""" return self.request(Endpoints.REPORT_DETAILED, parameters=data) def getDetailedReportPages(self, data): - '''return detailed report data from all pages for a user''' + """return detailed report data from all pages for a user""" pages_index = 1 - data['page'] = pages_index + data["page"] = pages_index pages = self.request(Endpoints.REPORT_DETAILED, parameters=data) try: - pages_number = math.ceil(pages.get('total_count', 0) / pages.get('per_page', 0)) + pages_number = math.ceil( + pages.get("total_count", 0) / pages.get("per_page", 0) + ) except ZeroDivisionError: pages_number = 0 for pages_index in range(2, pages_number + 1): - time.sleep(1) # There is rate limiting of 1 request per second (per IP per API token). - data['page'] = pages_index - pages['data'].extend(self.request(Endpoints.REPORT_DETAILED, parameters=data).get('data', [])) + time.sleep( + 1 + ) # There is rate limiting of 1 request per second (per IP per API token). + data["page"] = pages_index + pages["data"].extend( + self.request(Endpoints.REPORT_DETAILED, parameters=data).get("data", []) + ) return pages def getDetailedReportPDF(self, data, filename): - '''save a detailed report as a pdf''' + """save a detailed report as a pdf""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".pdf", parameters=data) @@ -431,7 +487,7 @@ class Toggl(): pdf.write(filedata) def getDetailedReportCSV(self, data, filename=None): - '''save a detailed report as a csv''' + """save a detailed report as a csv""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".csv", parameters=data) @@ -443,11 +499,11 @@ class Toggl(): return filedata def getSummaryReport(self, data): - '''return a summary report for a user''' + """return a summary report for a user""" return self.request(Endpoints.REPORT_SUMMARY, parameters=data) def getSummaryReportPDF(self, data, filename): - '''save a summary report as a pdf''' + """save a summary report as a pdf""" # get the raw pdf file data filedata = self.requestRaw(Endpoints.REPORT_SUMMARY + ".pdf", parameters=data) @@ -467,10 +523,10 @@ class Toggl(): """ data = {} - data['client'] = {} - data['client']['name'] = name - data['client']['wid'] = wid - data['client']['notes'] = notes + data["client"] = {} + data["client"]["name"] = name + data["client"]["wid"] = wid + data["client"]["notes"] = notes response = self.postRequest(Endpoints.CLIENTS, parameters=data) return self.decodeJSON(response) @@ -484,11 +540,13 @@ class Toggl(): """ data = {} - data['client'] = {} - data['client']['name'] = name - data['client']['notes'] = notes + data["client"] = {} + data["client"]["name"] = name + data["client"]["notes"] = notes - response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), parameters=data, method='PUT') + response = self.postRequest( + Endpoints.CLIENTS + "/{0}".format(id), parameters=data, method="PUT" + ) return self.decodeJSON(response) def deleteClient(self, id): @@ -496,5 +554,7 @@ class Toggl(): Delete the specified client :param id: The id of the client to delete """ - response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), method='DELETE') + response = self.postRequest( + Endpoints.CLIENTS + "/{0}".format(id), method="DELETE" + ) return response diff --git a/toggl/__init__.py b/toggl/__init__.py index d3ee426..19a53c7 100644 --- a/toggl/__init__.py +++ b/toggl/__init__.py @@ -1,2 +1,2 @@ -__version__ = '0.1.1' -__all__ = ['TogglPy'] +__version__ = "0.1.1" +__all__ = ["TogglPy"] diff --git a/toggl/tests.py b/toggl/tests.py index 8eb4938..e93e864 100644 --- a/toggl/tests.py +++ b/toggl/tests.py @@ -18,20 +18,19 @@ from toggl.TogglPy import Toggl # - - class TogglPyTests(unittest.TestCase): - def setUp(self): - self.api_key = os.environ['TOGGL_API_KEY'] + self.api_key = os.environ["TOGGL_API_KEY"] if self.api_key is None: raise Exception("Unable to execute api tests without an api key") - self.workspace_id = os.environ['WORKSPACE_ID'] + self.workspace_id = os.environ["WORKSPACE_ID"] if self.workspace_id is None: - raise Exception("Unable to execute api tests without a workspace key to query") + raise Exception( + "Unable to execute api tests without a workspace key to query" + ) - self.toggl = Toggl() + self.toggl = Toggl() self.toggl.setAPIKey(self.api_key) def test_connect(self): @@ -40,31 +39,34 @@ class TogglPyTests(unittest.TestCase): def test_putTimeEntry(self): request_args = { - 'workspace_id': self.workspace_id, + "workspace_id": self.workspace_id, } entries = self.toggl.getDetailedReport(request_args) - #for this tests I'm tagging my Pomodoro Entries - missing_projects = [r for r in entries['data'] if r['project'] is None and 'Pomodoro' in r['description'] ] + # for this tests I'm tagging my Pomodoro Entries + missing_projects = [ + r + for r in entries["data"] + if r["project"] is None and "Pomodoro" in r["description"] + ] me = missing_projects[0] - me_id = me['id'] #remember for later + me_id = me["id"] # remember for later - #I've tagged my pomodoro entries as Self/Self + # I've tagged my pomodoro entries as Self/Self cp = self.toggl.getClientProject("Self", "Self") - project_id = cp['data']['id'] - me['pid'] = project_id + project_id = cp["data"]["id"] + me["pid"] = project_id - #his is the new stuff - response = self.toggl.putTimeEntry({"id": me_id, "pid":project_id}) + # his is the new stuff + response = self.toggl.putTimeEntry({"id": me_id, "pid": project_id}) self.assertTrue(response is not None) - self.assertTrue('data' in response) - self.assertTrue(response['data']['pid'] == project_id) - + self.assertTrue("data" in response) + self.assertTrue(response["data"]["pid"] == project_id) def test_getDetailedReportCSV(self): data = { - 'workspace_id': self.workspace_id, + "workspace_id": self.workspace_id, } - csvfile = 'data.csv' + csvfile = "data.csv" self.toggl.getDetailedReportCSV(data, csvfile) self.assertTrue(os.path.isfile(csvfile)) os.remove(csvfile) @@ -72,26 +74,26 @@ class TogglPyTests(unittest.TestCase): data = self.toggl.getDetailedReportCSV(data) self.assertTrue(data is not None) - def test_getDetailedReport(self): data = { - 'workspace_id': self.workspace_id, + "workspace_id": self.workspace_id, } d = self.toggl.getDetailedReport(data) self.assertTrue(d is not None) - self.assertTrue(len(d.keys()) > 0 ) - fields = ['total_count', 'total_currencies', 'total_billable', 'data'] + self.assertTrue(len(d.keys()) > 0) + fields = ["total_count", "total_currencies", "total_billable", "data"] for f in fields: self.assertTrue(f in d.keys()) - data = d['data'] - self.assertTrue(len(data)>0) + data = d["data"] + self.assertTrue(len(data) > 0) dr = data[0] - self.assertTrue('client' in dr) - self.assertTrue('start' in dr) - self.assertTrue('end' in dr) - self.assertTrue('task' in dr) - self.assertTrue('user' in dr) - self.assertTrue('project' in dr) + self.assertTrue("client" in dr) + self.assertTrue("start" in dr) + self.assertTrue("end" in dr) + self.assertTrue("task" in dr) + self.assertTrue("user" in dr) + self.assertTrue("project" in dr) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/toggl/toggl2gsuite.py b/toggl/toggl2gsuite.py index f93599c..23adeeb 100644 --- a/toggl/toggl2gsuite.py +++ b/toggl/toggl2gsuite.py @@ -5,25 +5,26 @@ import gspread from oauth2client.service_account import ServiceAccountCredentials from toggl.TogglPy import Toggl -#this test demonstrates how to link up the toggl API into a google sheet -#in order to do this, you'll need to first setup your google account developer environment -#to do this, you can follow the instructions here: http://tinaja.computer/2017/10/27/gspread.html -#additional information about the spread API here: https://github.com/burnash/gspread +# this test demonstrates how to link up the toggl API into a google sheet +# in order to do this, you'll need to first setup your google account developer environment +# to do this, you can follow the instructions here: http://tinaja.computer/2017/10/27/gspread.html +# additional information about the spread API here: https://github.com/burnash/gspread + +# as such, to run this test you'll need to define the following env variables +# TOGGL_API_KEY : your toggl api key +# WORKSPACE_ID: a workspace id that you'd like to dump data for +# KEYFILE: the full path to your google suite keyfile (keep this secret/safe!) +# SHEET_URL: the url of the google sheet you are writing to -#as such, to run this test you'll need to define the following env variables -#TOGGL_API_KEY : your toggl api key -#WORKSPACE_ID: a workspace id that you'd like to dump data for -#KEYFILE: the full path to your google suite keyfile (keep this secret/safe!) -#SHEET_URL: the url of the google sheet you are writing to class Toggl2GSuiteTest(unittest.TestCase): def setUp(self): - self.api_key = os.environ['TOGGL_API_KEY'] + self.api_key = os.environ["TOGGL_API_KEY"] self.toggl = Toggl() self.toggl.setAPIKey(self.api_key) # see https://stackoverflow.com/questions/19153462/get-excel-style-column-names-from-column-number - LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" @staticmethod def excel_style(row, col): @@ -32,35 +33,48 @@ class Toggl2GSuiteTest(unittest.TestCase): while col: col, rem = divmod(col - 1, 26) result[:0] = Toggl2GSuiteTest.LETTERS[rem] - return ''.join(result) + str(row) + return "".join(result) + str(row) def test_toggl2gsuite(self): # have to do this year by year data = { - 'workspace_id': os.environ['WORKSPACE_ID'], + "workspace_id": os.environ["WORKSPACE_ID"], } y = self.toggl.getDetailedReport(data) - credentials = ServiceAccountCredentials.from_json_keyfile_name( - os.environ['KEYFILE'], - ['https://spreadsheets.google.com/feeds']) + os.environ["KEYFILE"], ["https://spreadsheets.google.com/feeds"] + ) client = gspread.authorize(credentials) - sheet = client.open_by_url(os.environ['SHEET_URL']) + sheet = client.open_by_url(os.environ["SHEET_URL"]) worksheet = sheet.get_worksheet(0) wrote_header = False - columns_to_write = ['user', 'updated', 'start', 'end', 'client', 'project', 'description', 'is_billable', - 'billable'] + columns_to_write = [ + "user", + "updated", + "start", + "end", + "client", + "project", + "description", + "is_billable", + "billable", + ] cell_row = 0 - for row_idx, rec in enumerate(y['data']): + for row_idx, rec in enumerate(y["data"]): if wrote_header == False: for col_idx, header in enumerate(columns_to_write): - worksheet.update_acell(Toggl2GSuiteTest.excel_style(row_idx + 1, col_idx + 1), header) + worksheet.update_acell( + Toggl2GSuiteTest.excel_style(row_idx + 1, col_idx + 1), header + ) wrote_header = True for col_idx, header in enumerate(columns_to_write): - worksheet.update_acell(Toggl2GSuiteTest.excel_style(row_idx + 2, col_idx + 1), rec[header]) + worksheet.update_acell( + Toggl2GSuiteTest.excel_style(row_idx + 2, col_idx + 1), rec[header] + ) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/zei/BatteryLevelChar.py b/zei/BatteryLevelChar.py index b61a056..5ac1b1f 100644 --- a/zei/BatteryLevelChar.py +++ b/zei/BatteryLevelChar.py @@ -12,4 +12,4 @@ class BatteryLevelChar(ZeiCharBase.ZeiCharBase): charUUID = btle.AssignedNumbers.battery_level def __init__(self, periph): - super.__init__(self, periph) \ No newline at end of file + super.__init__(self, periph) diff --git a/zei/Zei.py b/zei/Zei.py index 5e799a4..872712f 100644 --- a/zei/Zei.py +++ b/zei/Zei.py @@ -6,12 +6,12 @@ from bluepy import btle from . import ZeiOrientationChar from . import ZeiDelegate -class Zei(btle.Peripheral): +class Zei(btle.Peripheral): def __init__(self, *args, **kwargs): btle.Peripheral.__init__(self, *args, **kwargs) self.withDelegate(ZeiDelegate.ZeiDelegate(self)) # activate notifications about turn self.orientation = ZeiOrientationChar.ZeiOrientationChar(self) - self.orientation.enable() \ No newline at end of file + self.orientation.enable() diff --git a/zei/ZeiCharBase.py b/zei/ZeiCharBase.py index 9b27ce7..8b0fb90 100644 --- a/zei/ZeiCharBase.py +++ b/zei/ZeiCharBase.py @@ -6,12 +6,11 @@ import struct class ZeiCharBase(object): - def __init__(self, periph): self.periph = periph self.hndl = None - #self.svcUUID = None - #self.charUUID = None + # self.svcUUID = None + # self.charUUID = None # pylint: disable=E1101 def enable(self): @@ -20,5 +19,7 @@ class ZeiCharBase(object): self.hndl = _chr.getHandle() # this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml - _cccd = _chr.getDescriptors(btle.AssignedNumbers.client_characteristic_configuration)[0] + _cccd = _chr.getDescriptors( + btle.AssignedNumbers.client_characteristic_configuration + )[0] _cccd.write(struct.pack("