-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathconfig.py
More file actions
261 lines (233 loc) · 8.1 KB
/
config.py
File metadata and controls
261 lines (233 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
from __future__ import print_function
import os
import stat
from grp import getgrgid
from pwd import getpwuid
from jsonschema import validate, ValidationError
from yaml import load
from yaml.error import YAMLError
from amazon_dash.exceptions import SecurityException, ConfigFileNotFoundError, InvalidConfig
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
#: Json-schema validation
SCHEMA = {
"title": "Config",
"type": "object",
"properties": {
"settings": {
"type": "object",
"properties": {
"delay": {
"type": "integer"
},
"interface": {
"type": [
"array",
"string",
"null"
]
},
}
},
"devices": {
"type": "object",
"properties": {
"/": {}
},
"patternProperties": {
"^([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2})$": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"cmd": {
"type": "string"
},
"user": {
"type": "string",
},
"cwd": {
"type": "string",
},
"url": {
"type": "string"
},
"method": {
"type": "string",
"oneOf": [
{"pattern": "GET|get"},
{"pattern": "HEAD|head"},
{"pattern": "POST|post"},
{"pattern": "PUT|put"},
{"pattern": "DELETE|delete"},
{"pattern": "CONNECT|connect"},
{"pattern": "OPTIONS|options"},
{"pattern": "trace|trace"},
{"pattern": "PATCH|patch"},
]
},
"headers": {
"type": "object",
},
"content-type": {
"type": "string"
},
"body": {
"type": "string"
},
"homeassistant": {
"type": "string"
},
"ifttt": {
"type": "string"
},
"event": {
"type": "string"
},
"confirmation": {
"type": "string",
}
},
}
},
"additionalProperties": False,
},
"confirmations": {
"type": "object",
"properties": {
"/": {}
},
"patternProperties": {
"^.+$": {
"type": "object",
"properties": {
"service": {
"enum": [
'telegram',
'pushbullet',
]
},
"token": {
"type": "string",
},
"is_default": {
"type": "boolean",
},
"to": {
"type": "integer"
},
"success_message": {
"type": "string"
},
"failure_message": {
"type": "string"
},
},
"required": ["service"],
}
},
}
},
"required": ["devices"]
}
def get_file_owner(file):
"""Get file owner id
:param str file: Path to file
:return: user id
:rtype: int
"""
try:
return getpwuid(os.stat(file).st_uid)[0]
except KeyError:
return '???'
def get_file_group(file):
"""Get file group id
:param file: Path to file
:return: group id
:rtype: int
"""
try:
return getgrgid(os.stat(file).st_uid)[0]
except KeyError:
return '???'
def bitperm(s, perm, pos):
"""Returns zero if there are no permissions for a bit of the perm. of a file. Otherwise it returns a positive value
:param os.stat_result s: os.stat(file) object
:param str perm: R (Read) or W (Write) or X (eXecute)
:param str pos: USR (USeR) or GRP (GRouP) or OTH (OTHer)
:return: mask value
:rtype: int
"""
perm = perm.upper()
pos = pos.upper()
assert perm in ['R', 'W', 'X']
assert pos in ['USR', 'GRP', 'OTH']
return s.st_mode & getattr(stat, 'S_I{}{}'.format(perm, pos))
def oth_w_perm(file):
"""Returns True if others have write permission to the file
:param str file: Path to file
:return: True if others have permits
:rtype: bool
"""
return bitperm(os.stat(file), 'w', 'oth')
def only_root_write(path):
"""File is only writable by root
:param str path: Path to file
:return: True if only root can write
:rtype: bool
"""
s = os.stat(path)
for ug, bp in [(s.st_uid, bitperm(s, 'w', 'usr')), (s.st_gid, bitperm(s, 'w', 'grp'))]:
# User id (is not root) and bit permission
if ug and bp:
return False
if bitperm(s, 'w', 'oth'):
return False
return True
class Config(dict):
"""Parse and validate yaml Amazon-dash file config. The instance behaves like a dictionary
"""
def __init__(self, file, ignore_perms=False, **kwargs):
"""Set the config file and validate file permissions
:param str file: path to file
:param kwargs: default values in dict
"""
super(Config, self).__init__(**kwargs)
if not os.path.lexists(file):
raise ConfigFileNotFoundError(file)
if not ignore_perms and ((not os.getuid() and not only_root_write(file)) or oth_w_perm(file)):
file = os.path.abspath(file)
raise SecurityException(
'There should be no permissions for other users in the file "{file}". '
'Current permissions: {user}:{group} {perms}. {msg}. '
'Run "sudo chmod 660 \'{file}\' && sudo chown root:root \'{file}\'"'.format(
file=file, user=get_file_owner(file),
group=get_file_group(file), perms=os.stat(file).st_mode & 0o777,
msg='Removes write permission for others' if os.getuid()
else 'Only root must be able to write to file'))
self.file = file
self.read()
def read(self):
"""Parse and validate the config file. The read data is accessible as a dictionary in this instance
:return: None
"""
try:
data = load(open(self.file), Loader)
except (UnicodeDecodeError, YAMLError) as e:
raise InvalidConfig(self.file, '{}'.format(e))
try:
validate(data, SCHEMA)
except ValidationError as e:
raise InvalidConfig(self.file, e)
self.update(data)
def check_config(file, printfn=print):
"""Command to check configuration file. Raises InvalidConfig on error
:param str file: path to config file
:param printfn: print function for success message
:return: None
"""
Config(file).read()
printfn('The configuration file "{}" is correct'.format(file))