This commit is contained in:
Tobias 2020-08-12 18:44:23 +02:00
parent ee7d7c5559
commit 96bb8a1fb1
14 changed files with 252 additions and 692 deletions

14
.gitignore vendored
View File

@ -140,3 +140,17 @@ dmypy.json
.pytype/
# End of https://www.toptal.com/developers/gitignore/api/python
# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode
# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode
### VisualStudioCode ###
.vscode/*
*.code-workspace
### VisualStudioCode Patch ###
# Ignore all local history of files
.history
# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode
*~

8
Mapping.py Normal file
View File

@ -0,0 +1,8 @@
class Mapping(object):
def __init__(self, side: int, id: int):
self.side = side
self.id = id
self.description = ""
self.tags = []

69
TogglDelegate.py Normal file
View File

@ -0,0 +1,69 @@
import struct
from toggl.TogglPy import Toggl
from zei.ZeiDelegate import ZeiDelegate
from datetime import datetime, timezone
from Mapping import Mapping
import dateutil.parser
import logging
_log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO)
class TogglDelegate(ZeiDelegate):
def __init__(self, periph, config):
self.config = config
self.toggl = Toggl()
self.toggl.setAPIKey(self.config['toggl']['settings']['token'])
self._populateProjects()
self._populateMappings(self.config['mappings'])
super().__init__(periph)
def handleNotification(self, cHandle, data):
if cHandle == 38: # Side Change Notification
side = struct.unpack('B', data)[0]
self._trackProjectByMapping(self.mappings[side])
else:
_log.info("Notification from hndl: %s - %r", cHandle, data)
def _trackProjectByMapping(self, mapping: Mapping):
self._trackProject(description=mapping.description, pid=mapping.id, tags=mapping.tags)
def _trackProject(self, description: str, pid: int, tags: list):
current = self.toggl.currentRunningTimeEntry()['data']
if current is not None:
if (datetime.now(timezone.utc) - dateutil.parser.isoparse(current['start'])).total_seconds() < 20:
# Delete entry if not older than 20s
_log.info("Abort currently running entry")
self.toggl.deleteTimeEntry(current['id'])
else:
_log.info("Stopping currently running entry")
self.toggl.stopTimeEntry(current['id'])
_log.info("Now tracking project %s: %s (%s)", self.projects[pid]['name'], description, ', '.join(tags if tags else []))
if pid == 0:
return
self.toggl.startTimeEntry(description, pid=pid, tags=tags)
def _populateMappings(self, mappings: dict):
self.mappings = { 0: Mapping(0, 0)}
for i in mappings:
self.mappings[int(i)] = Mapping(int(i), int(mappings[i]["id"]))
if "description" in mappings[i]:
self.mappings[int(i)].description = mappings[i]["description"]
if "tags" in mappings[i]:
self.mappings[int(i)].tags = mappings[i]["tags"]
def _populateProjects(self):
self.projects = {}
proj = (self.toggl.getWorkspaceProjects(self.config['toggl']['settings']['workspace_id']))
NoneProj = {'id': 0, 'wid': int(self.config['toggl']['settings']['workspace_id']), 'name': 'None', 'billable': False, 'is_private': True, 'active': True, 'template': False, 'at': '2020-06-09T04:02:38+00:00', 'created_at': '2019-12-09T16:36:28+00:00', 'color': '9', 'auto_estimates': False, 'actual_hours': 0, 'hex_color': '#990099'}
self.projects[0] = NoneProj
for i in proj:
self.projects[i['id']] = i

209
main.py
View File

@ -2,219 +2,20 @@
# -*- coding: utf-8 -*-
#
from bluepy import btle
from toggl.TogglPy import Toggl
import struct
import json
import dateutil.parser
from datetime import datetime, timezone
from TogglDelegate import TogglDelegate
from zei.Zei import Zei
from zei.ZeiDiscovery import ZeiDiscovery
import logging
_log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO)
def _ZEI_UUID(short_uuid):
print( 'c7e7%04X-c847-11e6-8175-8c89a55d403c' % (short_uuid))
return 'c7e7%04X-c847-11e6-8175-8c89a55d403c' % (short_uuid)
class ZeiCharBase:
def __init__(self, periph):
self.periph = periph
self.hndl = None
def enable(self):
_svc = self.periph.getServiceByUUID(self.svcUUID)
_chr = _svc.getCharacteristics(self.charUUID)[0]
self.hndl = _chr.getHandle()
# this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml
_cccd = _chr.getDescriptors(btle.AssignedNumbers.client_characteristic_configuration)[0]
_cccd.write(struct.pack("<H", 2), withResponse=True)
class ZeiOrientationChar(ZeiCharBase):
svcUUID = _ZEI_UUID(0x0010)
charUUID = _ZEI_UUID(0x0012)
def __init__(self, periph):
ZeiCharBase.__init__(self, periph)
class BatteryLevelChar(ZeiCharBase):
svcUUID = btle.AssignedNumbers.battery_service
charUUID = btle.AssignedNumbers.battery_level
def __init__(self, periph):
ZeiCharBase.__init__(self, periph)
class Zei(btle.Peripheral):
def __init__(self, *args, **kwargs):
btle.Peripheral.__init__(self, *args, **kwargs)
self.withDelegate(ZeiDelegate(self))
# activate notifications about turn
self.orientation = ZeiOrientationChar(self)
self.orientation.enable()
class ZeiDelegate(btle.DefaultDelegate):
def __init__(self, periph):
btle.DefaultDelegate.__init__(self)
self.parent = periph
def handleNotification(self, cHandle, data):
if cHandle == 38:
side = struct.unpack('B', data)[0]
_log.info("Current side up is %s", side )
else:
_log.info("Notification from hndl: %s - %r", cHandle, data)
class ZeiDiscoveryDelegate(btle.DefaultDelegate):
def __init__(self, scanner, periph):
btle.DefaultDelegate.__init__(self)
self.scanner = scanner
self.periph = periph
def handleDiscovery(self, dev, isNewDev, isNewData):
if not dev.addr == 'f1:05:a5:9c:2e:9b':
return
_log.info("Device %s (%s), RSSI=%d dB", dev.addr, dev.addrType, dev.rssi)
for (adtype, desc, value) in dev.getScanData():
_log.info(" %s = %s", desc, value)
# reconnect
# bluepy can only do one thing at a time, so stop scanning while trying to connect
# this is not supported by bluepy
#self.scanner.stop()
try:
self.periph.connect(dev)
self.scanner.stop_scanning = True
except:
# re
self.scanner.start()
pass
class ZeiDiscovery(btle.Scanner):
def __init__(self, periph=None, **kwargs):
self.zei = periph
btle.Scanner.__init__(self, **kwargs)
#self.withDelegate(ZeiDiscoveryDelegate(self, self.zei))
#self.stop_scanning = False
def reconnect(self):
self.iface=self.zei.iface
self.clear()
self.start()
while self.zei.addr not in self.scanned:
self.process(timeout=2)
self.stop()
self.zei.connect(self.scanned[self.zei.addr])
class TogglDelegate(ZeiDelegate):
def __init__(self, periph, config):
self.config = config
self.toggl = Toggl()
self.toggl.setAPIKey(self.config['toggl']['settings']['token'])
self._populateProjects()
super().__init__(periph)
def handleNotification(self, cHandle, data):
if cHandle == 38:
side = struct.unpack('B', data)[0]
self._trackProject(self._getDescriptionBySide(side), self._getIdBySide(side), self._getTagsBySide(side))
else:
_log.info("Notification from hndl: %s - %r", cHandle, data)
def _getIdBySide(self, side: int):
if str(side) in self.config['mappings']:
return self.config['mappings'][str(side)]['id']
else:
return 0
def _getTagsBySide(self, side: int):
if str(side) in self.config['mappings'] and 'tags' in self.config['mappings'][str(side)]:
return self.config['mappings'][str(side)]['tags']
else:
return None
def _getDescriptionBySide(self, side: int):
if str(side) in self.config['mappings'] and 'description' in self.config['mappings'][str(side)]:
return self.config['mappings'][str(side)]['description']
else:
return ''
def _getProjectById(self, id: int):
return self.projects[int(id)]
def _populateProjects(self):
self.projects = {}
proj = (self.toggl.getWorkspaceProjects(self.config['toggl']['settings']['workspace_id']))
NoneProj = {'id': 0, 'wid': int(self.config['toggl']['settings']['workspace_id']), 'name': 'None', 'billable': False, 'is_private': True, 'active': True, 'template': False, 'at': '2020-06-09T04:02:38+00:00', 'created_at': '2019-12-09T16:36:28+00:00', 'color': '9', 'auto_estimates': False, 'actual_hours': 0, 'hex_color': '#990099'}
self.projects[0] = NoneProj
for i in proj:
self.projects[i['id']] = i
def _trackProject(self, description: str, pid: int, tags: list):
current = self.toggl.currentRunningTimeEntry()['data']
if current is not None:
if (datetime.now(timezone.utc) - dateutil.parser.isoparse(current['start'])).total_seconds() < 20:
# Delete entry if not older than 20s
_log.info("Abort currently running entry")
self.toggl.deleteTimeEntry(current['id'])
else:
_log.info("Stopping currently running entry")
self.toggl.stopTimeEntry(current['id'])
_log.info("Now tracking project %s: %s (%s)", self._getProjectById(pid)['name'], description, ', '.join(tags if tags else []))
if pid == 0:
return
self.toggl.startTimeEntry(description, pid=pid, tags=tags )
def main():
# config = {
# 'toggl': {
# 'settings': {
# 'token': 'XXXXX',
# 'user_agent': 'Toggl-Zei-Py',
# 'workspace_id': '2629429'
# }
# },
# 'mappings': {
# "7": {
# 'id': '157907853',
# 'description': 'Test'
# }
# }
# }
#json.dump(config, open('./config.json', 'w'), sort_keys=True, indent=4)
config = json.load(open("./config.json", "r"))
zei = Zei('c5:58:ed:89:90:ba', 'random', iface=0)
zei = Zei(config["zei"]["mac"], 'random', iface=0)
zei.withDelegate(TogglDelegate(zei, config))
scanner = ZeiDiscovery(zei)

View File

@ -1,488 +0,0 @@
"""
TogglPy is a non-cluttered, easily understood and implemented
library for interacting with the Toggl API.
"""
import json # parsing json data
import math
import sys
import time
from base64 import b64encode
from datetime import datetime
# for making requests
# backward compatibility with python2
cafile = None
if sys.version[0] == "2":
from urllib import urlencode
from urllib2 import urlopen, Request
else:
from urllib.parse import urlencode
from urllib.request import urlopen, Request
try:
import certifi
cafile = certifi.where()
except ImportError:
pass
# --------------------------------------------
# Class containing the endpoint URLs for Toggl
# --------------------------------------------
class Endpoints():
WORKSPACES = "https://www.toggl.com/api/v8/workspaces"
CLIENTS = "https://www.toggl.com/api/v8/clients"
PROJECTS = "https://www.toggl.com/api/v8/projects"
TASKS = "https://www.toggl.com/api/v8/tasks"
REPORT_WEEKLY = "https://toggl.com/reports/api/v2/weekly"
REPORT_DETAILED = "https://toggl.com/reports/api/v2/details"
REPORT_SUMMARY = "https://toggl.com/reports/api/v2/summary"
START_TIME = "https://www.toggl.com/api/v8/time_entries/start"
TIME_ENTRIES = "https://www.toggl.com/api/v8/time_entries"
CURRENT_RUNNING_TIME = "https://www.toggl.com/api/v8/time_entries/current"
@staticmethod
def STOP_TIME(pid):
return "https://www.toggl.com/api/v8/time_entries/" + str(pid) + "/stop"
# ------------------------------------------------------
# Class containing the necessities for Toggl interaction
# ------------------------------------------------------
class Toggl():
# template of headers for our request
headers = {
"Authorization": "",
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "python/urllib",
}
# default API user agent value
user_agent = "TogglPy"
# ------------------------------------------------------------
# Auxiliary methods
# ------------------------------------------------------------
def decodeJSON(self, jsonString):
return json.JSONDecoder().decode(jsonString)
# ------------------------------------------------------------
# Methods that modify the headers to control our HTTP requests
# ------------------------------------------------------------
def setAPIKey(self, APIKey):
'''set the API key in the request header'''
# craft the Authorization
authHeader = APIKey + ":" + "api_token"
authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip()
# add it into the header
self.headers['Authorization'] = authHeader
def setAuthCredentials(self, email, password):
authHeader = '{0}:{1}'.format(email, password)
authHeader = "Basic " + b64encode(authHeader.encode()).decode('ascii').rstrip()
# add it into the header
self.headers['Authorization'] = authHeader
def setUserAgent(self, agent):
'''set the User-Agent setting, by default it's set to TogglPy'''
self.user_agent = agent
# -----------------------------------------------------
# Methods for directly requesting data from an endpoint
# -----------------------------------------------------
def requestRaw(self, endpoint, parameters=None):
'''make a request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)'''
if parameters is None:
return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read()
else:
if 'user_agent' not in parameters:
parameters.update({'user_agent': self.user_agent}) # add our class-level user agent in there
# encode all of our data for a get request & modify the URL
endpoint = endpoint + "?" + urlencode(parameters)
# make request and read the response
return urlopen(Request(endpoint, headers=self.headers), cafile=cafile).read()
def request(self, endpoint, parameters=None):
'''make a request to the toggle api at a certain endpoint and return the page data as a parsed JSON dict'''
return json.loads(self.requestRaw(endpoint, parameters).decode('utf-8'))
def postRequest(self, endpoint, parameters=None, method='POST'):
'''make a POST request to the toggle api at a certain endpoint and return the RAW page data (usually JSON)'''
if method == 'DELETE': # Calls to the API using the DELETE mothod return a HTTP response rather than JSON
return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).code
if parameters is None:
return urlopen(Request(endpoint, headers=self.headers, method=method), cafile=cafile).read().decode('utf-8')
else:
data = json.JSONEncoder().encode(parameters)
binary_data = data.encode('utf-8')
# make request and read the response
return urlopen(
Request(endpoint, data=binary_data, headers=self.headers, method=method), cafile=cafile
).read().decode('utf-8')
# ---------------------------------
# Methods for managing Time Entries
# ---------------------------------
def startTimeEntry(self, description, pid=None, tid=None):
'''starts a new Time Entry'''
data = {
"time_entry": {
"created_with": self.user_agent,
"description": description
}
}
if pid:
data["time_entry"]["pid"] = pid
if tid:
data["time_entry"]["tid"] = tid
response = self.postRequest(Endpoints.START_TIME, parameters=data)
return self.decodeJSON(response)
def currentRunningTimeEntry(self):
'''Gets the Current Time Entry'''
response = self.postRequest(Endpoints.CURRENT_RUNNING_TIME)
return self.decodeJSON(response)
def stopTimeEntry(self, entryid):
'''Stop the time entry'''
response = self.postRequest(Endpoints.STOP_TIME(entryid))
return self.decodeJSON(response)
def createTimeEntry(self, hourduration, description=None, projectid=None, projectname=None,
taskid=None, clientname=None, year=None, month=None, day=None, hour=None,
billable=False, hourdiff=-2):
"""
Creating a custom time entry, minimum must is hour duration and project param
:param hourduration:
:param description: Sets a descripton for the newly created time entry
:param projectid: Not required if projectname given
:param projectname: Not required if projectid was given
:param taskid: Adds a task to the time entry (Requirement: Toggl Starter or higher)
:param clientname: Can speed up project query process
:param year: Taken from now() if not provided
:param month: Taken from now() if not provided
:param day: Taken from now() if not provided
:param hour: Taken from now() if not provided
:return: response object from post call
"""
data = {
"time_entry": {}
}
if not projectid:
if projectname and clientname:
projectid = (self.getClientProject(clientname, projectname))['data']['id']
elif projectname:
projectid = (self.searchClientProject(projectname))['data']['id']
else:
print('Too many missing parameters for query')
exit(1)
if description:
data['time_entry']['description'] = description
if taskid:
data['time_entry']['tid'] = taskid
year = datetime.now().year if not year else year
month = datetime.now().month if not month else month
day = datetime.now().day if not day else day
hour = datetime.now().hour if not hour else hour
timestruct = datetime(year, month, day, hour + hourdiff).isoformat() + '.000Z'
data['time_entry']['start'] = timestruct
data['time_entry']['duration'] = hourduration * 3600
data['time_entry']['pid'] = projectid
data['time_entry']['created_with'] = 'NAME'
data['time_entry']['billable'] = billable
response = self.postRequest(Endpoints.TIME_ENTRIES, parameters=data)
return self.decodeJSON(response)
def putTimeEntry(self, parameters):
if 'id' not in parameters:
raise Exception("An id must be provided in order to put a time entry")
id = parameters['id']
if type(id) is not int:
raise Exception("Invalid id %s provided " % (id))
endpoint = Endpoints.TIME_ENTRIES + "/" + str(id) # encode all of our data for a put request & modify the URL
data = json.JSONEncoder().encode({'time_entry': parameters})
request = Request(endpoint, data=data, headers=self.headers)
request.get_method = lambda: "PUT"
return json.loads(urlopen(request).read())
# ----------------------------------
# Methods for getting workspace data
# ----------------------------------
def getWorkspaces(self):
'''return all the workspaces for a user'''
return self.request(Endpoints.WORKSPACES)
def getWorkspace(self, name=None, id=None):
'''return the first workspace that matches a given name or id'''
workspaces = self.getWorkspaces() # get all workspaces
# if they give us nothing let them know we're not returning anything
if name is None and id is None:
print("Error in getWorkspace(), please enter either a name or an id as a filter")
return None
if id is None: # then we search by name
for workspace in workspaces: # search through them for one matching the name provided
if workspace['name'] == name:
return workspace # if we find it return it
return None # if we get to here and haven't found it return None
else: # otherwise search by id
for workspace in workspaces: # search through them for one matching the id provided
if workspace['id'] == int(id):
return workspace # if we find it return it
return None # if we get to here and haven't found it return None
def getWorkspaceProjects(self, id):
"""
Return all of the projects for a given Workspace
:param id: Workspace ID by which to query
:return: Projects object returned from endpoint
"""
return self.request(Endpoints.WORKSPACES + '/{0}'.format(id) + '/projects')
# -------------------------------
# Methods for getting client data
# -------------------------------
def getClients(self):
'''return all clients that are visable to a user'''
return self.request(Endpoints.CLIENTS)
def getClient(self, name=None, id=None):
'''return the first workspace that matches a given name or id'''
clients = self.getClients() # get all clients
# if they give us nothing let them know we're not returning anything
if name is None and id is None:
print("Error in getClient(), please enter either a name or an id as a filter")
return None
if id is None: # then we search by name
for client in clients: # search through them for one matching the name provided
if client['name'] == name:
return client # if we find it return it
return None # if we get to here and haven't found it return None
else: # otherwise search by id
for client in clients: # search through them for one matching the id provided
if client['id'] == int(id):
return client # if we find it return it
return None # if we get to here and haven't found it return None
def getClientProjects(self, id, active='true'):
"""
:param id: Client ID by which to query
:param active: possible values true/false/both. By default true. If false, only archived projects are returned.
:return: Projects object returned from endpoint
"""
return self.request(Endpoints.CLIENTS + '/{0}/projects?active={1}'.format(id, active))
def searchClientProject(self, name):
"""
Provide only a projects name for query and search through entire available names
WARNING: Takes a long time!
If client name is known, 'getClientProject' would be advised
:param name: Desired Project's name
:return: Project object
"""
for client in self.getClients():
try:
for project in self.getClientProjects(client['id']):
if project['name'] == name:
return project
except Exception:
continue
print('Could not find client by the name')
return None
def getClientProject(self, clientName, projectName):
"""
Fast query given the Client's name and Project's name
:param clientName:
:param projectName:
:return:
"""
for client in self.getClients():
if client['name'] == clientName:
cid = client['id']
if not cid:
print('Could not find such client name')
return None
for projct in self.getClientProjects(cid):
if projct['name'] == projectName:
pid = projct['id']
if not pid:
print('Could not find such project name')
return None
return self.getProject(pid)
# --------------------------------
# Methods for getting PROJECTS data
# --------------------------------
def getProject(self, pid):
'''return all projects that are visable to a user'''
return self.request(Endpoints.PROJECTS + '/{0}'.format(pid))
def getProjectTasks(self, pid, archived=False):
"""
return all tasks of a given project
:param pid: Project ID
:param archived: choose wether to fetch archived tasks or not
"""
return self.request(Endpoints.PROJECTS + '/{0}'.format(pid) + '/tasks')
# --------------------------------
# Methods for interacting with TASKS data
# --------------------------------
def createTask(self, name, pid, active=True, estimatedSeconds=False):
"""
create a new task (Requirement: Toggl Starter or higher)
:param name: Name of the task
:param pid: Project ID
:param active: Defines if the task is active or archived, default: active
:param estimatedSeconds: Estimation for the task in seconds
"""
data = {}
data['task'] = {}
data['task']['name'] = name
data['task']['pid'] = pid
data['task']['active'] = active
data['task']['estimated_seconds'] = estimatedSeconds
response = self.postRequest(Endpoints.TASKS, parameters=data)
return self.decodeJSON(response)
# --------------------------------
# Methods for getting reports data
# ---------------------------------
def getWeeklyReport(self, data):
'''return a weekly report for a user'''
return self.request(Endpoints.REPORT_WEEKLY, parameters=data)
def getWeeklyReportPDF(self, data, filename):
'''save a weekly report as a PDF'''
# get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_WEEKLY + ".pdf", parameters=data)
# write the data to a file
with open(filename, "wb") as pdf:
pdf.write(filedata)
def getDetailedReport(self, data):
'''return a detailed report for a user'''
return self.request(Endpoints.REPORT_DETAILED, parameters=data)
def getDetailedReportPages(self, data):
'''return detailed report data from all pages for a user'''
pages_index = 1
data['page'] = pages_index
pages = self.request(Endpoints.REPORT_DETAILED, parameters=data)
try:
pages_number = math.ceil(pages.get('total_count', 0) / pages.get('per_page', 0))
except ZeroDivisionError:
pages_number = 0
for pages_index in range(2, pages_number + 1):
time.sleep(1) # There is rate limiting of 1 request per second (per IP per API token).
data['page'] = pages_index
pages['data'].extend(self.request(Endpoints.REPORT_DETAILED, parameters=data).get('data', []))
return pages
def getDetailedReportPDF(self, data, filename):
'''save a detailed report as a pdf'''
# get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".pdf", parameters=data)
# write the data to a file
with open(filename, "wb") as pdf:
pdf.write(filedata)
def getDetailedReportCSV(self, data, filename=None):
'''save a detailed report as a csv'''
# get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_DETAILED + ".csv", parameters=data)
if filename:
# write the data to a file
with open(filename, "wb") as pdf:
pdf.write(filedata)
else:
return filedata
def getSummaryReport(self, data):
'''return a summary report for a user'''
return self.request(Endpoints.REPORT_SUMMARY, parameters=data)
def getSummaryReportPDF(self, data, filename):
'''save a summary report as a pdf'''
# get the raw pdf file data
filedata = self.requestRaw(Endpoints.REPORT_SUMMARY + ".pdf", parameters=data)
# write the data to a file
with open(filename, "wb") as pdf:
pdf.write(filedata)
# --------------------------------
# Methods for creating, updating, and deleting clients
# ---------------------------------
def createClient(self, name, wid, notes=None):
"""
create a new client
:param name: Name the client
:param wid: Workspace ID
:param notes: Notes for the client (optional)
"""
data = {}
data['client'] = {}
data['client']['name'] = name
data['client']['wid'] = wid
data['client']['notes'] = notes
response = self.postRequest(Endpoints.CLIENTS, parameters=data)
return self.decodeJSON(response)
def updateClient(self, id, name=None, notes=None):
"""
Update data for an existing client. If the name or notes parameter is not supplied, the existing data on the Toggl server will not be changed.
:param id: The id of the client to update
:param name: Update the name of the client (optional)
:param notes: Update the notes for the client (optional)
"""
data = {}
data['client'] = {}
data['client']['name'] = name
data['client']['notes'] = notes
response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), parameters=data, method='PUT')
return self.decodeJSON(response)
def deleteClient(self, id):
"""
Delete the specified client
:param id: The id of the client to delete
"""
response = self.postRequest(Endpoints.CLIENTS + '/{0}'.format(id), method='DELETE')
return response

15
zei/BatteryLevelChar.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from bluepy import btle
import ZeiCharBase
# pylint: disable=E1101
class BatteryLevelChar(ZeiCharBase.ZeiCharBase):
svcUUID = btle.AssignedNumbers.battery_service
charUUID = btle.AssignedNumbers.battery_level
def __init__(self, periph):
super.__init__(self, periph)

8
zei/Log.py Normal file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import logging
_log = logging.getLogger(__name__)
_log.addHandler(logging.StreamHandler())
_log.setLevel(logging.INFO)

17
zei/Zei.py Normal file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from bluepy import btle
from . import ZeiOrientationChar
from . import ZeiDelegate
class Zei(btle.Peripheral):
def __init__(self, *args, **kwargs):
btle.Peripheral.__init__(self, *args, **kwargs)
self.withDelegate(ZeiDelegate.ZeiDelegate(self))
# activate notifications about turn
self.orientation = ZeiOrientationChar.ZeiOrientationChar(self)
self.orientation.enable()

24
zei/ZeiCharBase.py Normal file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from bluepy import btle
import struct
class ZeiCharBase(object):
def __init__(self, periph):
self.periph = periph
self.hndl = None
#self.svcUUID = None
#self.charUUID = None
# pylint: disable=E1101
def enable(self):
_svc = self.periph.getServiceByUUID(self.svcUUID)
_chr = _svc.getCharacteristics(self.charUUID)[0]
self.hndl = _chr.getHandle()
# this is uint16_t - see: https://www.bluetooth.com/specifications/gatt/viewer?attributeXmlFile=org.bluetooth.descriptor.gatt.client_characteristic_configuration.xml
_cccd = _chr.getDescriptors(btle.AssignedNumbers.client_characteristic_configuration)[0]
_cccd.write(struct.pack("<H", 2), withResponse=True)

20
zei/ZeiDelegate.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
import struct
from bluepy import btle
from .Log import _log
class ZeiDelegate(btle.DefaultDelegate):
def __init__(self, periph):
btle.DefaultDelegate.__init__(self)
self.parent = periph
def handleNotification(self, cHandle, data):
if cHandle == 38:
side = struct.unpack('B', data)[0]
_log.info("Current side up is %s", side )
else:
_log.info("Notification from hndl: %s - %r", cHandle, data)

23
zei/ZeiDiscovery.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from bluepy import btle
class ZeiDiscovery(btle.Scanner):
def __init__(self, periph=None, **kwargs):
self.zei = periph
btle.Scanner.__init__(self, **kwargs)
#self.withDelegate(ZeiDiscoveryDelegate(self, self.zei))
#self.stop_scanning = False
def reconnect(self):
self.iface=self.zei.iface
self.clear()
self.start()
while self.zei.addr not in self.scanned:
self.process(timeout=2)
self.stop()
self.zei.connect(self.scanned[self.zei.addr])

View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from bluepy import btle
from Log import _log
class ZeiDiscoveryDelegate(btle.DefaultDelegate):
def __init__(self, scanner, periph):
btle.DefaultDelegate.__init__(self)
self.scanner = scanner
self.periph = periph
def handleDiscovery(self, dev, isNewDev, isNewData):
if not dev.addr == 'f1:05:a5:9c:2e:9b':
return
_log.info("Device %s (%s), RSSI=%d dB", dev.addr, dev.addrType, dev.rssi)
for (_, desc, value) in dev.getScanData():
_log.info(" %s = %s", desc, value)
# reconnect
# bluepy can only do one thing at a time, so stop scanning while trying to connect
# this is not supported by bluepy
#self.scanner.stop()
try:
self.periph.connect(dev)
self.scanner.stop_scanning = True
except:
# re
self.scanner.start()
pass

15
zei/ZeiOrientationChar.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
from . import ZeiCharBase
def _ZEI_UUID(short_uuid):
return 'c7e7%04X-c847-11e6-8175-8c89a55d403c' % (short_uuid)
class ZeiOrientationChar(ZeiCharBase.ZeiCharBase):
svcUUID = _ZEI_UUID(0x0010)
charUUID = _ZEI_UUID(0x0012)
def __init__(self, periph):
ZeiCharBase.ZeiCharBase.__init__(self, periph)

1
zei/__init__.py Normal file
View File

@ -0,0 +1 @@
# Empty