Reformat code using black

This commit is contained in:
Tobias Manske 2020-08-16 00:00:08 +02:00
parent 000e3d7f8a
commit 607bb18fb3
Signed by: tobias
GPG Key ID: D5914DC71F2F9352
15 changed files with 317 additions and 217 deletions

View File

@ -1,8 +1,6 @@
class Mapping(object): class Mapping(object):
def __init__(self, side: int, id: int): def __init__(self, side: int, id: int):
self.side = side self.side = side
self.id = id self.id = id
self.description = "" self.description = ""
self.tags = [] self.tags = []

View File

@ -11,47 +11,55 @@ _log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler()) _log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO) _log.setLevel(logging.INFO)
class TogglDelegate(ZeiDelegate):
class TogglDelegate(ZeiDelegate):
def __init__(self, periph, config): def __init__(self, periph, config):
self.config = config self.config = config
self.toggl = Toggl() self.toggl = Toggl()
self.toggl.setAPIKey(self.config['toggl']['settings']['token']) self.toggl.setAPIKey(self.config["toggl"]["settings"]["token"])
self._populateProjects() self._populateProjects()
self._populateMappings(self.config['mappings']) self._populateMappings(self.config["mappings"])
super().__init__(periph) super().__init__(periph)
def handleNotification(self, cHandle, data): def handleNotification(self, cHandle, data):
if cHandle == 38: # Side Change Notification if cHandle == 38: # Side Change Notification
side = struct.unpack('B', data)[0] side = struct.unpack("B", data)[0]
self._trackProjectByMapping(self.mappings[side]) self._trackProjectByMapping(self.mappings[side])
else: else:
_log.info("Notification from hndl: %s - %r", cHandle, data) _log.info("Notification from hndl: %s - %r", cHandle, data)
def _trackProjectByMapping(self, mapping: Mapping): def _trackProjectByMapping(self, mapping: Mapping):
self._trackProject(description=mapping.description, pid=mapping.id, tags=mapping.tags) self._trackProject(
description=mapping.description, pid=mapping.id, tags=mapping.tags
)
def _trackProject(self, description: str, pid: int, tags: list): def _trackProject(self, description: str, pid: int, tags: list):
current = self.toggl.currentRunningTimeEntry()['data'] current = self.toggl.currentRunningTimeEntry()["data"]
if current is not None: if current is not None:
if (datetime.now(timezone.utc) - dateutil.parser.isoparse(current['start'])).total_seconds() < 20: if (
datetime.now(timezone.utc) - dateutil.parser.isoparse(current["start"])
).total_seconds() < 20:
# Delete entry if not older than 20s # Delete entry if not older than 20s
_log.info("Abort currently running entry") _log.info("Abort currently running entry")
self.toggl.deleteTimeEntry(current['id']) self.toggl.deleteTimeEntry(current["id"])
else: else:
_log.info("Stopping currently running entry") _log.info("Stopping currently running entry")
self.toggl.stopTimeEntry(current['id']) self.toggl.stopTimeEntry(current["id"])
_log.info(
_log.info("Now tracking project %s: %s (%s)", self.projects[pid]['name'], description, ', '.join(tags if tags else [])) "Now tracking project %s: %s (%s)",
self.projects[pid]["name"],
description,
", ".join(tags if tags else []),
)
if pid == 0: if pid == 0:
return return
self.toggl.startTimeEntry(description, pid=pid, tags=tags) self.toggl.startTimeEntry(description, pid=pid, tags=tags)
def _populateMappings(self, mappings: dict): def _populateMappings(self, mappings: dict):
self.mappings = { 0: Mapping(0, 0)} self.mappings = {0: Mapping(0, 0)}
for i in mappings: for i in mappings:
self.mappings[int(i)] = Mapping(int(i), int(mappings[i]["id"])) self.mappings[int(i)] = Mapping(int(i), int(mappings[i]["id"]))
if "description" in mappings[i]: if "description" in mappings[i]:
@ -61,9 +69,25 @@ class TogglDelegate(ZeiDelegate):
def _populateProjects(self): def _populateProjects(self):
self.projects = {} self.projects = {}
proj = (self.toggl.getWorkspaceProjects(self.config['toggl']['settings']['workspace_id'])) proj = self.toggl.getWorkspaceProjects(
NoneProj = {'id': 0, 'wid': int(self.config['toggl']['settings']['workspace_id']), 'name': 'None', 'billable': False, 'is_private': True, 'active': True, 'template': False, 'at': '2020-06-09T04:02:38+00:00', 'created_at': '2019-12-09T16:36:28+00:00', 'color': '9', 'auto_estimates': False, 'actual_hours': 0, 'hex_color': '#990099'} self.config["toggl"]["settings"]["workspace_id"]
)
NoneProj = {
"id": 0,
"wid": int(self.config["toggl"]["settings"]["workspace_id"]),
"name": "None",
"billable": False,
"is_private": True,
"active": True,
"template": False,
"at": "2020-06-09T04:02:38+00:00",
"created_at": "2019-12-09T16:36:28+00:00",
"color": "9",
"auto_estimates": False,
"actual_hours": 0,
"hex_color": "#990099",
}
self.projects[0] = NoneProj self.projects[0] = NoneProj
for i in proj: for i in proj:
self.projects[i['id']] = i self.projects[i["id"]] = i

View File

@ -12,16 +12,17 @@ _log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler()) _log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO) _log.setLevel(logging.INFO)
def main(): def main():
config = json.load(open("./config.json", "r")) config = json.load(open("./config.json", "r"))
zei = Zei(config["zei"]["mac"], 'random', iface=0) zei = Zei(config["zei"]["mac"], "random", iface=0)
zei.withDelegate(TogglDelegate(zei, config)) zei.withDelegate(TogglDelegate(zei, config))
scanner = ZeiDiscovery(zei) scanner = ZeiDiscovery(zei)
while True: while True:
try: try:
zei.waitForNotifications(timeout=None) zei.waitForNotifications(timeout=None)
except Exception as e: except Exception as e:
_log.exception(e) _log.exception(e)
scanner.reconnect() scanner.reconnect()

View File

@ -18,8 +18,10 @@ if sys.version[0] == "2":
else: else:
from urllib.parse import urlencode from urllib.parse import urlencode
from urllib.request import urlopen, Request from urllib.request import urlopen, Request
try: try:
import certifi import certifi
cafile = certifi.where() cafile = certifi.where()
except ImportError: except ImportError:
pass pass
@ -28,7 +30,7 @@ else:
# -------------------------------------------- # --------------------------------------------
# Class containing the endpoint URLs for Toggl # Class containing the endpoint URLs for Toggl
# -------------------------------------------- # --------------------------------------------
class Endpoints(): class Endpoints:
WORKSPACES = "https://www.toggl.com/api/v8/workspaces" WORKSPACES = "https://www.toggl.com/api/v8/workspaces"
CLIENTS = "https://www.toggl.com/api/v8/clients" CLIENTS = "https://www.toggl.com/api/v8/clients"
PROJECTS = "https://www.toggl.com/api/v8/projects" PROJECTS = "https://www.toggl.com/api/v8/projects"
@ -48,7 +50,7 @@ class Endpoints():
# ------------------------------------------------------ # ------------------------------------------------------
# Class containing the necessities for Toggl interaction # Class containing the necessities for Toggl interaction
# ------------------------------------------------------ # ------------------------------------------------------
class Toggl(): class Toggl:
# template of headers for our request # template of headers for our request
headers = { headers = {
"Authorization": "", "Authorization": "",
@ -71,23 +73,23 @@ class Toggl():
# Methods that modify the headers to control our HTTP requests # Methods that modify the headers to control our HTTP requests
# ------------------------------------------------------------ # ------------------------------------------------------------
def setAPIKey(self, APIKey): def setAPIKey(self, APIKey):
'''set the API key in the request header''' """set the API key in the request header"""
# craft the Authorization # craft the Authorization
authHeader = APIKey + ":" + "api_token" authHeader = APIKey + ":" + "api_token"
authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip() authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip()
# add it into the header # add it into the header
self.headers['Authorization'] = authHeader self.headers["Authorization"] = authHeader
def setAuthCredentials(self, email, password): def setAuthCredentials(self, email, password):
authHeader = '{0}:{1}'.format(email, password) authHeader = "{0}:{1}".format(email, password)
authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip() authHeader = "Basic " + b64encode(authHeader.encode()).decode("ascii").rstrip()
# add it into the header # add it into the header
self.headers['Authorization'] = authHeader self.headers["Authorization"] = authHeader
def setUserAgent(self, agent): def setUserAgent(self, agent):
'''set the User-Agent setting, by default it's set to TogglPy''' """set the User-Agent setting, by default it's set to TogglPy"""
self.user_agent = agent self.user_agent = agent
# ----------------------------------------------------- # -----------------------------------------------------
@ -95,47 +97,68 @@ class Toggl():
# ----------------------------------------------------- # -----------------------------------------------------
def requestRaw(self, endpoint, parameters=None): def requestRaw(self, endpoint, parameters=None):
'''make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)''' """make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)"""
if parameters is None: if parameters is None:
return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read() return urlopen(
Request(endpoint, headers=self.headers), cafile=cafile
).read()
else: else:
if 'user_agent' not in parameters: if "user_agent" not in parameters:
parameters.update({'user_agent': self.user_agent}) # add our class-level user agent in there parameters.update(
{"user_agent": self.user_agent}
) # add our class-level user agent in there
# encode all of our data for a get request & modify the URL # encode all of our data for a get request & modify the URL
endpoint = endpoint + "?" + urlencode(parameters) endpoint = endpoint + "?" + urlencode(parameters)
# make request and read the response # make request and read the response
return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read() return urlopen(
Request(endpoint, headers=self.headers), cafile=cafile
).read()
def request(self, endpoint, parameters=None): def request(self, endpoint, parameters=None):
'''make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict''' """make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict"""
return json.loads(self.requestRaw(endpoint, parameters).decode('utf-8')) return json.loads(self.requestRaw(endpoint, parameters).decode("utf-8"))
def postRequest(self, endpoint, parameters=None, method='POST'): def postRequest(self, endpoint, parameters=None, method="POST"):
'''make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)''' """make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)"""
if method == 'DELETE': # Calls to the API using the DELETE mothod return a HTTP response rather than JSON if (
return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).code method == "DELETE"
): # Calls to the API using the DELETE mothod return a HTTP response rather than JSON
return urlopen(
Request(endpoint, headers=self.headers, method=method), cafile=cafile
).code
if parameters is None: if parameters is None:
return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).read().decode('utf-8') return (
urlopen(
Request(endpoint, headers=self.headers, method=method),
cafile=cafile,
)
.read()
.decode("utf-8")
)
else: else:
data = json.JSONEncoder().encode(parameters) data = json.JSONEncoder().encode(parameters)
binary_data = data.encode('utf-8') binary_data = data.encode("utf-8")
# make request and read the response # make request and read the response
return urlopen( return (
Request(endpoint, data=binary_data, headers=self.headers, method=method), cafile=cafile urlopen(
).read().decode('utf-8') Request(
endpoint, data=binary_data, headers=self.headers, method=method
),
cafile=cafile,
)
.read()
.decode("utf-8")
)
# --------------------------------- # ---------------------------------
# Methods for managing Time Entries # Methods for managing Time Entries
# --------------------------------- # ---------------------------------
def startTimeEntry(self, description, pid=None, tid=None, tags=None): def startTimeEntry(self, description, pid=None, tid=None, tags=None):
'''starts a new Time Entry''' """starts a new Time Entry"""
data = { data = {
"time_entry": { "time_entry": {"created_with": self.user_agent, "description": description}
"created_with": self.user_agent,
"description": description
}
} }
if pid: if pid:
data["time_entry"]["pid"] = pid data["time_entry"]["pid"] = pid
@ -145,22 +168,33 @@ class Toggl():
if tags: if tags:
data["time_entry"]["tags"] = tags data["time_entry"]["tags"] = tags
response = self.postRequest(Endpoints.START_TIME, parameters=data) response = self.postRequest(Endpoints.START_TIME, parameters=data)
return self.decodeJSON(response) return self.decodeJSON(response)
def currentRunningTimeEntry(self): def currentRunningTimeEntry(self):
'''Gets the Current Time Entry''' """Gets the Current Time Entry"""
return self.request(Endpoints.CURRENT_RUNNING_TIME) return self.request(Endpoints.CURRENT_RUNNING_TIME)
def stopTimeEntry(self, entryid): def stopTimeEntry(self, entryid):
'''Stop the time entry''' """Stop the time entry"""
response = self.postRequest(Endpoints.STOP_TIME(entryid), method='PUT') response = self.postRequest(Endpoints.STOP_TIME(entryid), method="PUT")
return self.decodeJSON(response) return self.decodeJSON(response)
def createTimeEntry(self, hourduration, description=None, projectid=None, projectname=None, def createTimeEntry(
taskid=None, clientname=None, year=None, month=None, day=None, hour=None, self,
billable=False, hourdiff=-2): hourduration,
description=None,
projectid=None,
projectname=None,
taskid=None,
clientname=None,
year=None,
month=None,
day=None,
hour=None,
billable=False,
hourdiff=-2,
):
""" """
Creating a custom time entry, minimum must is hour duration and project param Creating a custom time entry, minimum must is hour duration and project param
:param hourduration: :param hourduration:
@ -175,87 +209,95 @@ class Toggl():
:param hour: Taken from now() if not provided :param hour: Taken from now() if not provided
:return: response object from post call :return: response object from post call
""" """
data = { data = {"time_entry": {}}
"time_entry": {}
}
if not projectid: if not projectid:
if projectname and clientname: if projectname and clientname:
projectid = (self.getClientProject(clientname, projectname))['data']['id'] projectid = (self.getClientProject(clientname, projectname))["data"][
"id"
]
elif projectname: elif projectname:
projectid = (self.searchClientProject(projectname))['data']['id'] projectid = (self.searchClientProject(projectname))["data"]["id"]
else: else:
print('Too many missing parameters for query') print("Too many missing parameters for query")
exit(1) exit(1)
if description: if description:
data['time_entry']['description'] = description data["time_entry"]["description"] = description
if taskid: if taskid:
data['time_entry']['tid'] = taskid data["time_entry"]["tid"] = taskid
year = datetime.now().year if not year else year year = datetime.now().year if not year else year
month = datetime.now().month if not month else month month = datetime.now().month if not month else month
day = datetime.now().day if not day else day day = datetime.now().day if not day else day
hour = datetime.now().hour if not hour else hour hour = datetime.now().hour if not hour else hour
timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + '.000Z' timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + ".000Z"
data['time_entry']['start'] = timestruct data["time_entry"]["start"] = timestruct
data['time_entry']['duration'] = hourduration * 3600 data["time_entry"]["duration"] = hourduration * 3600
data['time_entry']['pid'] = projectid data["time_entry"]["pid"] = projectid
data['time_entry']['created_with'] = 'NAME' data["time_entry"]["created_with"] = "NAME"
data['time_entry']['billable'] = billable data["time_entry"]["billable"] = billable
response = self.postRequest(Endpoints.TIME_ENTRIES, parameters=data) response = self.postRequest(Endpoints.TIME_ENTRIES, parameters=data)
return self.decodeJSON(response) return self.decodeJSON(response)
def putTimeEntry(self, parameters): def putTimeEntry(self, parameters):
if 'id' not in parameters: if "id" not in parameters:
raise Exception("An id must be provided in order to put a time entry") raise Exception("An id must be provided in order to put a time entry")
id = parameters['id'] id = parameters["id"]
if type(id) is not int: if type(id) is not int:
raise Exception("Invalid id %s provided " % (id)) raise Exception("Invalid id %s provided " % (id))
endpoint = Endpoints.TIME_ENTRIES + "/" + str(id) # encode all of our data for a put request & modify the URL endpoint = (
data = json.JSONEncoder().encode({'time_entry': parameters}) Endpoints.TIME_ENTRIES + "/" + str(id)
) # encode all of our data for a put request & modify the URL
data = json.JSONEncoder().encode({"time_entry": parameters})
request = Request(endpoint, data=data, headers=self.headers) request = Request(endpoint, data=data, headers=self.headers)
request.get_method = lambda: "PUT" request.get_method = lambda: "PUT"
return json.loads(urlopen(request).read()) return json.loads(urlopen(request).read())
def deleteTimeEntry(self, entryid): def deleteTimeEntry(self, entryid):
""" """
Delete the specified timeEntry Delete the specified timeEntry
:param entryid: The id of the entry to delete :param entryid: The id of the entry to delete
""" """
response = self.postRequest(Endpoints.TIME_ENTRIES + '/{0}'.format(entryid), method='DELETE') response = self.postRequest(
Endpoints.TIME_ENTRIES + "/{0}".format(entryid), method="DELETE"
)
return response return response
# ---------------------------------- # ----------------------------------
# Methods for getting workspace data # Methods for getting workspace data
# ---------------------------------- # ----------------------------------
def getWorkspaces(self): def getWorkspaces(self):
'''return all the workspaces for a user''' """return all the workspaces for a user"""
return self.request(Endpoints.WORKSPACES) return self.request(Endpoints.WORKSPACES)
def getWorkspace(self, name=None, id=None): def getWorkspace(self, name=None, id=None):
'''return the first workspace that matches a given name or id''' """return the first workspace that matches a given name or id"""
workspaces = self.getWorkspaces() # get all workspaces workspaces = self.getWorkspaces() # get all workspaces
# if they give us nothing let them know we're not returning anything # if they give us nothing let them know we're not returning anything
if name is None and id is None: if name is None and id is None:
print("Error in getWorkspace(), please enter either a name or an id as a filter") print(
"Error in getWorkspace(), please enter either a name or an id as a filter"
)
return None return None
if id is None: # then we search by name if id is None: # then we search by name
for workspace in workspaces: # search through them for one matching the name provided for (
if workspace['name'] == name: workspace
) in workspaces: # search through them for one matching the name provided
if workspace["name"] == name:
return workspace # if we find it return it return workspace # if we find it return it
return None # if we get to here and haven't found it return None return None # if we get to here and haven't found it return None
else: # otherwise search by id else: # otherwise search by id
for workspace in workspaces: # search through them for one matching the id provided for (
if workspace['id'] == int(id): workspace
) in workspaces: # search through them for one matching the id provided
if workspace["id"] == int(id):
return workspace # if we find it return it return workspace # if we find it return it
return None # if we get to here and haven't found it return None return None # if we get to here and haven't found it return None
@ -266,43 +308,51 @@ class Toggl():
:return: Projects object returned from endpoint :return: Projects object returned from endpoint
""" """
return self.request(Endpoints.WORKSPACES + '/{0}'.format(id) + '/projects') return self.request(Endpoints.WORKSPACES + "/{0}".format(id) + "/projects")
# ------------------------------- # -------------------------------
# Methods for getting client data # Methods for getting client data
# ------------------------------- # -------------------------------
def getClients(self): def getClients(self):
'''return all clients that are visable to a user''' """return all clients that are visable to a user"""
return self.request(Endpoints.CLIENTS) return self.request(Endpoints.CLIENTS)
def getClient(self, name=None, id=None): def getClient(self, name=None, id=None):
'''return the first workspace that matches a given name or id''' """return the first workspace that matches a given name or id"""
clients = self.getClients() # get all clients clients = self.getClients() # get all clients
# if they give us nothing let them know we're not returning anything # if they give us nothing let them know we're not returning anything
if name is None and id is None: if name is None and id is None:
print("Error in getClient(), please enter either a name or an id as a filter") print(
"Error in getClient(), please enter either a name or an id as a filter"
)
return None return None
if id is None: # then we search by name if id is None: # then we search by name
for client in clients: # search through them for one matching the name provided for (
if client['name'] == name: client
) in clients: # search through them for one matching the name provided
if client["name"] == name:
return client # if we find it return it return client # if we find it return it
return None # if we get to here and haven't found it return None return None # if we get to here and haven't found it return None
else: # otherwise search by id else: # otherwise search by id
for client in clients: # search through them for one matching the id provided for (
if client['id'] == int(id): client
) in clients: # search through them for one matching the id provided
if client["id"] == int(id):
return client # if we find it return it return client # if we find it return it
return None # if we get to here and haven't found it return None return None # if we get to here and haven't found it return None
def getClientProjects(self, id, active='true'): def getClientProjects(self, id, active="true"):
""" """
:param id: Client ID by which to query :param id: Client ID by which to query
:param active: possible values true/false/both. By default true. If false, only archived projects are returned. :param active: possible values true/false/both. By default true. If false, only archived projects are returned.
:return: Projects object returned from endpoint :return: Projects object returned from endpoint
""" """
return self.request(Endpoints.CLIENTS + '/{0}/projects?active={1}'.format(id, active)) return self.request(
Endpoints.CLIENTS + "/{0}/projects?active={1}".format(id, active)
)
def searchClientProject(self, name): def searchClientProject(self, name):
""" """
@ -314,13 +364,13 @@ class Toggl():
""" """
for client in self.getClients(): for client in self.getClients():
try: try:
for project in self.getClientProjects(client['id']): for project in self.getClientProjects(client["id"]):
if project['name'] == name: if project["name"] == name:
return project return project
except Exception: except Exception:
continue continue
print('Could not find client by the name') print("Could not find client by the name")
return None return None
def getClientProject(self, clientName, projectName): def getClientProject(self, clientName, projectName):
@ -331,19 +381,19 @@ class Toggl():
:return: :return:
""" """
for client in self.getClients(): for client in self.getClients():
if client['name'] == clientName: if client["name"] == clientName:
cid = client['id'] cid = client["id"]
if not cid: if not cid:
print('Could not find such client name') print("Could not find such client name")
return None return None
for projct in self.getClientProjects(cid): for projct in self.getClientProjects(cid):
if projct['name'] == projectName: if projct["name"] == projectName:
pid = projct['id'] pid = projct["id"]
if not pid: if not pid:
print('Could not find such project name') print("Could not find such project name")
return None return None
return self.getProject(pid) return self.getProject(pid)
@ -352,8 +402,8 @@ class Toggl():
# Methods for getting PROJECTS data # Methods for getting PROJECTS data
# -------------------------------- # --------------------------------
def getProject(self, pid): def getProject(self, pid):
'''return all projects that are visable to a user''' """return all projects that are visable to a user"""
return self.request(Endpoints.PROJECTS + '/{0}'.format(pid)) return self.request(Endpoints.PROJECTS + "/{0}".format(pid))
def getProjectTasks(self, pid, archived=False): def getProjectTasks(self, pid, archived=False):
""" """
@ -361,7 +411,7 @@ class Toggl():
:param pid: Project ID :param pid: Project ID
:param archived: choose wether to fetch archived tasks or not :param archived: choose wether to fetch archived tasks or not
""" """
return self.request(Endpoints.PROJECTS + '/{0}'.format(pid) + '/tasks') return self.request(Endpoints.PROJECTS + "/{0}".format(pid) + "/tasks")
# -------------------------------- # --------------------------------
# Methods for interacting with TASKS data # Methods for interacting with TASKS data
@ -377,11 +427,11 @@ class Toggl():
""" """
data = {} data = {}
data['task'] = {} data["task"] = {}
data['task']['name'] = name data["task"]["name"] = name
data['task']['pid'] = pid data["task"]["pid"] = pid
data['task']['active'] = active data["task"]["active"] = active
data['task']['estimated_seconds'] = estimatedSeconds data["task"]["estimated_seconds"] = estimatedSeconds
response = self.postRequest(Endpoints.TASKS, parameters=data) response = self.postRequest(Endpoints.TASKS, parameters=data)
return self.decodeJSON(response) return self.decodeJSON(response)
@ -390,11 +440,11 @@ class Toggl():
# Methods for getting reports data # Methods for getting reports data
# --------------------------------- # ---------------------------------
def getWeeklyReport(self, data): def getWeeklyReport(self, data):
'''return a weekly report for a user''' """return a weekly report for a user"""
return self.request(Endpoints.REPORT_WEEKLY, parameters=data) return self.request(Endpoints.REPORT_WEEKLY, parameters=data)
def getWeeklyReportPDF(self, data, filename): def getWeeklyReportPDF(self, data, filename):
'''save a weekly report as a PDF''' """save a weekly report as a PDF"""
# get the raw pdf file data # get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_WEEKLY + ".pdf", parameters=data) filedata = self.requestRaw(Endpoints.REPORT_WEEKLY + ".pdf", parameters=data)
@ -403,26 +453,32 @@ class Toggl():
pdf.write(filedata) pdf.write(filedata)
def getDetailedReport(self, data): def getDetailedReport(self, data):
'''return a detailed report for a user''' """return a detailed report for a user"""
return self.request(Endpoints.REPORT_DETAILED, parameters=data) return self.request(Endpoints.REPORT_DETAILED, parameters=data)
def getDetailedReportPages(self, data): def getDetailedReportPages(self, data):
'''return detailed report data from all pages for a user''' """return detailed report data from all pages for a user"""
pages_index = 1 pages_index = 1
data['page'] = pages_index data["page"] = pages_index
pages = self.request(Endpoints.REPORT_DETAILED, parameters=data) pages = self.request(Endpoints.REPORT_DETAILED, parameters=data)
try: try:
pages_number = math.ceil(pages.get('total_count', 0) / pages.get('per_page', 0)) pages_number = math.ceil(
pages.get("total_count", 0) / pages.get("per_page", 0)
)
except ZeroDivisionError: except ZeroDivisionError:
pages_number = 0 pages_number = 0
for pages_index in range(2, pages_number + 1): for pages_index in range(2, pages_number + 1):
time.sleep(1) # There is rate limiting of 1 request per second (per IP per API token). time.sleep(
data['page'] = pages_index 1
pages['data'].extend(self.request(Endpoints.REPORT_DETAILED, parameters=data).get('data', [])) ) # There is rate limiting of 1 request per second (per IP per API token).
data["page"] = pages_index
pages["data"].extend(
self.request(Endpoints.REPORT_DETAILED, parameters=data).get("data", [])
)
return pages return pages
def getDetailedReportPDF(self, data, filename): def getDetailedReportPDF(self, data, filename):
'''save a detailed report as a pdf''' """save a detailed report as a pdf"""
# get the raw pdf file data # get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".pdf", parameters=data) filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".pdf", parameters=data)
@ -431,7 +487,7 @@ class Toggl():
pdf.write(filedata) pdf.write(filedata)
def getDetailedReportCSV(self, data, filename=None): def getDetailedReportCSV(self, data, filename=None):
'''save a detailed report as a csv''' """save a detailed report as a csv"""
# get the raw pdf file data # get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".csv", parameters=data) filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".csv", parameters=data)
@ -443,11 +499,11 @@ class Toggl():
return filedata return filedata
def getSummaryReport(self, data): def getSummaryReport(self, data):
'''return a summary report for a user''' """return a summary report for a user"""
return self.request(Endpoints.REPORT_SUMMARY, parameters=data) return self.request(Endpoints.REPORT_SUMMARY, parameters=data)
def getSummaryReportPDF(self, data, filename): def getSummaryReportPDF(self, data, filename):
'''save a summary report as a pdf''' """save a summary report as a pdf"""
# get the raw pdf file data # get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_SUMMARY + ".pdf", parameters=data) filedata = self.requestRaw(Endpoints.REPORT_SUMMARY + ".pdf", parameters=data)
@ -467,10 +523,10 @@ class Toggl():
""" """
data = {} data = {}
data['client'] = {} data["client"] = {}
data['client']['name'] = name data["client"]["name"] = name
data['client']['wid'] = wid data["client"]["wid"] = wid
data['client']['notes'] = notes data["client"]["notes"] = notes
response = self.postRequest(Endpoints.CLIENTS, parameters=data) response = self.postRequest(Endpoints.CLIENTS, parameters=data)
return self.decodeJSON(response) return self.decodeJSON(response)
@ -484,11 +540,13 @@ class Toggl():
""" """
data = {} data = {}
data['client'] = {} data["client"] = {}
data['client']['name'] = name data["client"]["name"] = name
data['client']['notes'] = notes data["client"]["notes"] = notes
response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), parameters=data, method='PUT') response = self.postRequest(
Endpoints.CLIENTS + "/{0}".format(id), parameters=data, method="PUT"
)
return self.decodeJSON(response) return self.decodeJSON(response)
def deleteClient(self, id): def deleteClient(self, id):
@ -496,5 +554,7 @@ class Toggl():
Delete the specified client Delete the specified client
:param id: The id of the client to delete :param id: The id of the client to delete
""" """
response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), method='DELETE') response = self.postRequest(
Endpoints.CLIENTS + "/{0}".format(id), method="DELETE"
)
return response return response

View File

@ -1,2 +1,2 @@
__version__ = '0.1.1' __version__ = "0.1.1"
__all__ = ['TogglPy'] __all__ = ["TogglPy"]

View File

@ -18,20 +18,19 @@ from toggl.TogglPy import Toggl
# #
class TogglPyTests(unittest.TestCase): class TogglPyTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.api_key = os.environ['TOGGL_API_KEY'] self.api_key = os.environ["TOGGL_API_KEY"]
if self.api_key is None: if self.api_key is None:
raise Exception("Unable to execute api tests without an api key") raise Exception("Unable to execute api tests without an api key")
self.workspace_id = os.environ['WORKSPACE_ID'] self.workspace_id = os.environ["WORKSPACE_ID"]
if self.workspace_id is None: if self.workspace_id is None:
raise Exception("Unable to execute api tests without a workspace key to query") raise Exception(
"Unable to execute api tests without a workspace key to query"
)
self.toggl = Toggl() self.toggl = Toggl()
self.toggl.setAPIKey(self.api_key) self.toggl.setAPIKey(self.api_key)
def test_connect(self): def test_connect(self):
@ -40,31 +39,34 @@ class TogglPyTests(unittest.TestCase):
def test_putTimeEntry(self): def test_putTimeEntry(self):
request_args = { request_args = {
'workspace_id': self.workspace_id, "workspace_id": self.workspace_id,
} }
entries = self.toggl.getDetailedReport(request_args) entries = self.toggl.getDetailedReport(request_args)
#for this tests I'm tagging my Pomodoro Entries # for this tests I'm tagging my Pomodoro Entries
missing_projects = [r for r in entries['data'] if r['project'] is None and 'Pomodoro' in r['description'] ] missing_projects = [
r
for r in entries["data"]
if r["project"] is None and "Pomodoro" in r["description"]
]
me = missing_projects[0] me = missing_projects[0]
me_id = me['id'] #remember for later me_id = me["id"] # remember for later
#I've tagged my pomodoro entries as Self/Self # I've tagged my pomodoro entries as Self/Self
cp = self.toggl.getClientProject("Self", "Self") cp = self.toggl.getClientProject("Self", "Self")
project_id = cp['data']['id'] project_id = cp["data"]["id"]
me['pid'] = project_id me["pid"] = project_id
#his is the new stuff # his is the new stuff
response = self.toggl.putTimeEntry({"id": me_id, "pid":project_id}) response = self.toggl.putTimeEntry({"id": me_id, "pid": project_id})
self.assertTrue(response is not None) self.assertTrue(response is not None)
self.assertTrue('data' in response) self.assertTrue("data" in response)
self.assertTrue(response['data']['pid'] == project_id) self.assertTrue(response["data"]["pid"] == project_id)
def test_getDetailedReportCSV(self): def test_getDetailedReportCSV(self):
data = { data = {
'workspace_id': self.workspace_id, "workspace_id": self.workspace_id,
} }
csvfile = 'data.csv' csvfile = "data.csv"
self.toggl.getDetailedReportCSV(data, csvfile) self.toggl.getDetailedReportCSV(data, csvfile)
self.assertTrue(os.path.isfile(csvfile)) self.assertTrue(os.path.isfile(csvfile))
os.remove(csvfile) os.remove(csvfile)
@ -72,26 +74,26 @@ class TogglPyTests(unittest.TestCase):
data = self.toggl.getDetailedReportCSV(data) data = self.toggl.getDetailedReportCSV(data)
self.assertTrue(data is not None) self.assertTrue(data is not None)
def test_getDetailedReport(self): def test_getDetailedReport(self):
data = { data = {
'workspace_id': self.workspace_id, "workspace_id": self.workspace_id,
} }
d = self.toggl.getDetailedReport(data) d = self.toggl.getDetailedReport(data)
self.assertTrue(d is not None) self.assertTrue(d is not None)
self.assertTrue(len(d.keys()) > 0 ) self.assertTrue(len(d.keys()) > 0)
fields = ['total_count', 'total_currencies', 'total_billable', 'data'] fields = ["total_count", "total_currencies", "total_billable", "data"]
for f in fields: for f in fields:
self.assertTrue(f in d.keys()) self.assertTrue(f in d.keys())
data = d['data'] data = d["data"]
self.assertTrue(len(data)>0) self.assertTrue(len(data) > 0)
dr = data[0] dr = data[0]
self.assertTrue('client' in dr) self.assertTrue("client" in dr)
self.assertTrue('start' in dr) self.assertTrue("start" in dr)
self.assertTrue('end' in dr) self.assertTrue("end" in dr)
self.assertTrue('task' in dr) self.assertTrue("task" in dr)
self.assertTrue('user' in dr) self.assertTrue("user" in dr)
self.assertTrue('project' in dr) self.assertTrue("project" in dr)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -5,25 +5,26 @@ import gspread
from oauth2client.service_account import ServiceAccountCredentials from oauth2client.service_account import ServiceAccountCredentials
from toggl.TogglPy import Toggl from toggl.TogglPy import Toggl
#this test demonstrates how to link up the toggl API into a google sheet # this test demonstrates how to link up the toggl API into a google sheet
#in order to do this, you'll need to first setup your google account developer environment # in order to do this, you'll need to first setup your google account developer environment
#to do this, you can follow the instructions here: http://tinaja.computer/2017/10/27/gspread.html # to do this, you can follow the instructions here: http://tinaja.computer/2017/10/27/gspread.html
#additional information about the spread API here: https://github.com/burnash/gspread # additional information about the spread API here: https://github.com/burnash/gspread
# as such, to run this test you'll need to define the following env variables
# TOGGL_API_KEY : your toggl api key
# WORKSPACE_ID: a workspace id that you'd like to dump data for
# KEYFILE: the full path to your google suite keyfile (keep this secret/safe!)
# SHEET_URL: the url of the google sheet you are writing to
#as such, to run this test you'll need to define the following env variables
#TOGGL_API_KEY : your toggl api key
#WORKSPACE_ID: a workspace id that you'd like to dump data for
#KEYFILE: the full path to your google suite keyfile (keep this secret/safe!)
#SHEET_URL: the url of the google sheet you are writing to
class Toggl2GSuiteTest(unittest.TestCase): class Toggl2GSuiteTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.api_key = os.environ['TOGGL_API_KEY'] self.api_key = os.environ["TOGGL_API_KEY"]
self.toggl = Toggl() self.toggl = Toggl()
self.toggl.setAPIKey(self.api_key) self.toggl.setAPIKey(self.api_key)
# see https://stackoverflow.com/questions/19153462/get-excel-style-column-names-from-column-number # see https://stackoverflow.com/questions/19153462/get-excel-style-column-names-from-column-number
LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@staticmethod @staticmethod
def excel_style(row, col): def excel_style(row, col):
@ -32,35 +33,48 @@ class Toggl2GSuiteTest(unittest.TestCase):
while col: while col:
col, rem = divmod(col - 1, 26) col, rem = divmod(col - 1, 26)
result[:0] = Toggl2GSuiteTest.LETTERS[rem] result[:0] = Toggl2GSuiteTest.LETTERS[rem]
return ''.join(result) + str(row) return "".join(result) + str(row)
def test_toggl2gsuite(self): def test_toggl2gsuite(self):
# have to do this year by year # have to do this year by year
data = { data = {
'workspace_id': os.environ['WORKSPACE_ID'], "workspace_id": os.environ["WORKSPACE_ID"],
} }
y = self.toggl.getDetailedReport(data) y = self.toggl.getDetailedReport(data)
credentials = ServiceAccountCredentials.from_json_keyfile_name( credentials = ServiceAccountCredentials.from_json_keyfile_name(
os.environ['KEYFILE'], os.environ["KEYFILE"], ["https://spreadsheets.google.com/feeds"]
['https://spreadsheets.google.com/feeds']) )
client = gspread.authorize(credentials) client = gspread.authorize(credentials)
sheet = client.open_by_url(os.environ['SHEET_URL']) sheet = client.open_by_url(os.environ["SHEET_URL"])
worksheet = sheet.get_worksheet(0) worksheet = sheet.get_worksheet(0)
wrote_header = False wrote_header = False
columns_to_write = ['user', 'updated', 'start', 'end', 'client', 'project', 'description', 'is_billable', columns_to_write = [
'billable'] "user",
"updated",
"start",
"end",
"client",
"project",
"description",
"is_billable",
"billable",
]
cell_row = 0 cell_row = 0
for row_idx, rec in enumerate(y['data']): for row_idx, rec in enumerate(y["data"]):
if wrote_header == False: if wrote_header == False:
for col_idx, header in enumerate(columns_to_write): for col_idx, header in enumerate(columns_to_write):
worksheet.update_acell(Toggl2GSuiteTest.excel_style(row_idx + 1, col_idx + 1), header) worksheet.update_acell(
Toggl2GSuiteTest.excel_style(row_idx + 1, col_idx + 1), header
)
wrote_header = True wrote_header = True
for col_idx, header in enumerate(columns_to_write): for col_idx, header in enumerate(columns_to_write):
worksheet.update_acell(Toggl2GSuiteTest.excel_style(row_idx + 2, col_idx + 1), rec[header]) worksheet.update_acell(
Toggl2GSuiteTest.excel_style(row_idx + 2, col_idx + 1), rec[header]
)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -12,4 +12,4 @@ class BatteryLevelChar(ZeiCharBase.ZeiCharBase):
charUUID = btle.AssignedNumbers.battery_level charUUID = btle.AssignedNumbers.battery_level
def __init__(self, periph): def __init__(self, periph):
super.__init__(self, periph) super.__init__(self, periph)

View File

@ -6,12 +6,12 @@ from bluepy import btle
from . import ZeiOrientationChar from . import ZeiOrientationChar
from . import ZeiDelegate from . import ZeiDelegate
class Zei(btle.Peripheral):
class Zei(btle.Peripheral):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
btle.Peripheral.__init__(self, *args, **kwargs) btle.Peripheral.__init__(self, *args, **kwargs)
self.withDelegate(ZeiDelegate.ZeiDelegate(self)) self.withDelegate(ZeiDelegate.ZeiDelegate(self))
# activate notifications about turn # activate notifications about turn
self.orientation = ZeiOrientationChar.ZeiOrientationChar(self) self.orientation = ZeiOrientationChar.ZeiOrientationChar(self)
self.orientation.enable() self.orientation.enable()

View File

@ -6,12 +6,11 @@ import struct
class ZeiCharBase(object): class ZeiCharBase(object):
def __init__(self, periph): def __init__(self, periph):
self.periph = periph self.periph = periph
self.hndl = None self.hndl = None
#self.svcUUID = None # self.svcUUID = None
#self.charUUID = None # self.charUUID = None
# pylint: disable=E1101 # pylint: disable=E1101
def enable(self): def enable(self):
@ -20,5 +19,7 @@ class ZeiCharBase(object):
self.hndl = _chr.getHandle() self.hndl = _chr.getHandle()
# this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml # this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml
_cccd = _chr.getDescriptors(btle.AssignedNumbers.client_characteristic_configuration)[0] _cccd = _chr.getDescriptors(
btle.AssignedNumbers.client_characteristic_configuration
)[0]
_cccd.write(struct.pack("<H", 2), withResponse=True) _cccd.write(struct.pack("<H", 2), withResponse=True)

View File

@ -7,14 +7,13 @@ from .Log import _log
class ZeiDelegate(btle.DefaultDelegate): class ZeiDelegate(btle.DefaultDelegate):
def __init__(self, periph): def __init__(self, periph):
btle.DefaultDelegate.__init__(self) btle.DefaultDelegate.__init__(self)
self.parent = periph self.parent = periph
def handleNotification(self, cHandle, data): def handleNotification(self, cHandle, data):
if cHandle == 38: if cHandle == 38:
side = struct.unpack('B', data)[0] side = struct.unpack("B", data)[0]
_log.info("Current side up is %s", side ) _log.info("Current side up is %s", side)
else: else:
_log.info("Notification from hndl: %s - %r", cHandle, data) _log.info("Notification from hndl: %s - %r", cHandle, data)

View File

@ -6,18 +6,17 @@ from bluepy import btle
class ZeiDiscovery(btle.Scanner): class ZeiDiscovery(btle.Scanner):
def __init__(self, periph=None, **kwargs): def __init__(self, periph=None, **kwargs):
self.zei = periph self.zei = periph
btle.Scanner.__init__(self, **kwargs) btle.Scanner.__init__(self, **kwargs)
#self.withDelegate(ZeiDiscoveryDelegate(self, self.zei)) # self.withDelegate(ZeiDiscoveryDelegate(self, self.zei))
#self.stop_scanning = False # self.stop_scanning = False
def reconnect(self): def reconnect(self):
self.iface=self.zei.iface self.iface = self.zei.iface
self.clear() self.clear()
self.start() self.start()
while self.zei.addr not in self.scanned: while self.zei.addr not in self.scanned:
self.process(timeout=2) self.process(timeout=2)
self.stop() self.stop()
self.zei.connect(self.scanned[self.zei.addr]) self.zei.connect(self.scanned[self.zei.addr])

View File

@ -13,7 +13,7 @@ class ZeiDiscoveryDelegate(btle.DefaultDelegate):
self.periph = periph self.periph = periph
def handleDiscovery(self, dev, isNewDev, isNewData): def handleDiscovery(self, dev, isNewDev, isNewData):
if not dev.addr == 'f1:05:a5:9c:2e:9b': if not dev.addr == "f1:05:a5:9c:2e:9b":
return return
_log.info("Device %s (%s), RSSI=%d dB", dev.addr, dev.addrType, dev.rssi) _log.info("Device %s (%s), RSSI=%d dB", dev.addr, dev.addrType, dev.rssi)
for (_, desc, value) in dev.getScanData(): for (_, desc, value) in dev.getScanData():
@ -22,7 +22,7 @@ class ZeiDiscoveryDelegate(btle.DefaultDelegate):
# bluepy can only do one thing at a time, so stop scanning while trying to connect # bluepy can only do one thing at a time, so stop scanning while trying to connect
# this is not supported by bluepy # this is not supported by bluepy
#self.scanner.stop() # self.scanner.stop()
try: try:
self.periph.connect(dev) self.periph.connect(dev)
@ -30,4 +30,4 @@ class ZeiDiscoveryDelegate(btle.DefaultDelegate):
except: except:
# re # re
self.scanner.start() self.scanner.start()
pass pass

View File

@ -4,8 +4,10 @@
from . import ZeiCharBase from . import ZeiCharBase
def _ZEI_UUID(short_uuid): def _ZEI_UUID(short_uuid):
return 'c7e7%04X-c847-11e6-8175-8c89a55d403c' % (short_uuid) return "c7e7%04X-c847-11e6-8175-8c89a55d403c" % (short_uuid)
class ZeiOrientationChar(ZeiCharBase.ZeiCharBase): class ZeiOrientationChar(ZeiCharBase.ZeiCharBase):
svcUUID = _ZEI_UUID(0x0010) svcUUID = _ZEI_UUID(0x0010)

View File

@ -1 +1 @@
# Empty # Empty