moodleNotifer
letsloginourself.py
1 import re
2 import os
3 import json
4 import urllib
5 import urllib3
6 import requests
7 import logging
8 
9 from http.cookiejar import MozillaCookieJar
10 from urllib.parse import urlparse
11 from getpass import getpass
12 
13 moodle_domain = "moodle.iitb.ac.in"
14 moodle_path = "/"
15 
19  """
20  Encapsulates the recurring logic for sending out requests to the
21  Moodle-System.
22  """
23 
24  stdHeader = {
25  'User-Agent': (
26  'Mozilla/5.0 (Linux; Android 7.1.1; Moto G Play Build/NPIS26.48-43-2; wv) AppleWebKit/537.36'
27  + ' (KHTML, like Gecko) Version/4.0 Chrome/71.0.3578.99 Mobile Safari/537.36 MoodleMobile'
28  ),
29  'Content-Type': 'application/x-www-form-urlencoded',
30  }
31 
32  def __init__(
33  self,
34  moodle_domain: str,
35  moodle_path: str = '/',
36  token: str = '',
37  skip_cert_verify: bool = False,
38  log_responses_to: str = None,
39  ):
40  self.tokentoken = token
41  self.moodle_domainmoodle_domain = moodle_domain
42  self.moodle_pathmoodle_path = moodle_path
43 
44  self.verifyverify = not skip_cert_verify
45  self.url_baseurl_base = 'https://' + moodle_domain + moodle_path
46 
47  self.log_responses_tolog_responses_to = log_responses_to
48  self.log_responseslog_responses = False
49 
50  if log_responses_to is not None:
51  self.log_responseslog_responses = True
52  with open(self.log_responses_tolog_responses_to, 'w') as response_log_file:
53  response_log_file.write('JSON Log:\n\n')
54 
55  logging.getLogger("requests").setLevel(logging.WARNING)
56  logging.getLogger("urllib3").setLevel(logging.WARNING)
57  urllib3.disable_warnings()
58  # logging.captureWarnings(True)
59 
60  def post_URL(self, url: str, data: {str: str} = None, cookie_jar_path: str = None):
61  """
62  Sends a POST request to a specific URL, including saving of cookies in cookie jar.
63  @param url: The url to which the request is sent. (the moodle base url is not added to the given URL)
64  @param data: The optional data is added to the POST body.
65  @param cookie_jar_path: Path to the cookies file.
66  @return: The resulting response object and the session object.
67  """
68 
69  data_urlencoded = ""
70  if data is not None:
71  data_urlencoded = RequestHelper.recursive_urlencode(data)
72 
73  session = requests.Session()
74 
75  if cookie_jar_path is not None:
76  session.cookies = MozillaCookieJar(cookie_jar_path)
77 
78  if os.path.exists(cookie_jar_path):
79  session.cookies.load(ignore_discard=True, ignore_expires=True)
80 
81  response = session.post(url, data=data_urlencoded, headers=self.stdHeaderstdHeader, verify=self.verifyverify)
82 
83  if cookie_jar_path is not None:
84  for cookie in session.cookies:
85  cookie.expires = 2147483647
86 
87  session.cookies.save(ignore_discard=True, ignore_expires=True)
88 
89  return response, session
90 
91  def get_URL(self, url: str, cookie_jar_path: str = None):
92  """
93  Sends a GET request to a specific URL of the Moodle system, including additional cookies
94  (cookies are updated after the request)
95  @param url: The url to which the request is sent. (the moodle base url is not added to the given URL)
96  @param cookie_jar_path: The optional cookies to add to the request
97  @return: The resulting Response object.
98  """
99 
100  session = requests.Session()
101 
102  if cookie_jar_path is not None:
103  session.cookies = MozillaCookieJar(cookie_jar_path)
104 
105  if os.path.exists(cookie_jar_path):
106  session.cookies.load(ignore_discard=True, ignore_expires=True)
107 
108  response = session.get(url, headers=self.stdHeaderstdHeader, verify=self.verifyverify)
109 
110  if cookie_jar_path is not None:
111  session.cookies.save(ignore_discard=True, ignore_expires=True)
112 
113  return response, session
114 
115  def post_REST(self, function: str, data: {str: str} = None) -> object:
116  """
117  Sends a POST request to the REST endpoint of the Moodle system
118  @param function: The Web service function to be called.
119  @param data: The optional data is added to the POST body.
120  @return: The JSON response returned by the Moodle system, already
121  checked for errors.
122  """
123 
124  if self.tokentoken is None:
125  raise ValueError('The required Token is not set!')
126 
127  data_urlencoded = self._get_POST_DATA_get_POST_DATA(function, self.tokentoken, data)
128  url = self._get_REST_POST_URL_get_REST_POST_URL(self.url_baseurl_base, function)
129 
130  response = requests.post(url, data=data_urlencoded, headers=self.stdHeaderstdHeader, verify=self.verifyverify)
131  json_result = self._initial_parse_initial_parse(response)
132 
133  if self.log_responseslog_responses and function not in ['tool_mobile_get_autologin_key']:
134  with open(self.log_responses_tolog_responses_to, 'a') as response_log_file:
135  response_log_file.write('URL: {}\n'.format(response.url))
136  response_log_file.write('Function: {}\n\n'.format(function))
137  response_log_file.write('Data: {}\n\n'.format(data))
138  response_log_file.write(json.dumps(json_result, indent=4, ensure_ascii=False))
139  response_log_file.write('\n\n\n')
140 
141  return json_result
142 
143  @staticmethod
144  def _get_REST_POST_URL(url_base: str, function: str) -> str:
145  """
146  Generates an URL for a REST-POST request
147  @params: The necessary parameters for a REST URL
148  @return: A formatted URL
149  """
150  url = '%swebservice/rest/server.php?moodlewsrestformat=json&wsfunction=%s' % (url_base, function)
151 
152  return url
153 
154  @staticmethod
155  def _get_POST_DATA(function: str, token: str, data_obj: str) -> str:
156  """
157  Generates the data for a REST-POST request
158  @params: The necessary parameters for a REST URL
159  @return: A URL-encoded data string
160  """
161  data = {'moodlewssettingfilter': 'true', 'moodlewssettingfileurl': 'true'}
162 
163  if data_obj is not None:
164  data.update(data_obj)
165 
166  data.update({'wsfunction': function, 'wstoken': token})
167 
168  return RequestHelper.recursive_urlencode(data)
169 
170  def get_login(self, data: {str: str}) -> object:
171  """
172  Sends a POST request to the login endpoint of the Moodle system to
173  obtain a token in JSON format.
174  @param data: The data is inserted into the Post-Body as arguments. This
175  should contain the login data.
176  @return: The JSON response returned by the Moodle System, already
177  checked for errors.
178  """
179 
180  response = requests.post(
181  '%slogin/token.php' % (self.url_baseurl_base),
182  data=urllib.parse.urlencode(data),
183  headers=self.stdHeaderstdHeader,
184  verify=self.verifyverify,
185  )
186 
187  return self._initial_parse_initial_parse(response)
188 
189  @staticmethod
190  def _check_response_code(response):
191  # Normally Moodle answer with response 200
192  if response.status_code != 200:
193  raise RuntimeError(
194  'An Unexpected Error happened on side of the Moodle System!'
195  + (' Status-Code: %s' % str(response.status_code))
196  + ('\nHeader: %s' % response.headers)
197  + ('\nResponse: %s' % response.text)
198  )
199 
200  def get_simple_moodle_version(self) -> float:
201  """
202  Query the version by looking up the change-log (/lib/upgrade.txt)
203  of the Moodle
204  @return: a float number representing the newest version
205  parsed from the change-log
206  """
207 
208  url = '%slib/upgrade.txt' % (self.url_baseurl_base)
209  response = requests.get(url, headers=self.stdHeaderstdHeader, verify=self.verifyverify)
210 
211  self._check_response_code_check_response_code(response)
212 
213  changelog = str(response.text).split('\n')
214  version_string = '1'
215  for line in changelog:
216  match = re.match(r'^===\s*([\d\.]+)\s*===$', line)
217  if match:
218  version_string = match.group(1)
219  break
220 
221  majorVersion = version_string.split('.')[0]
222  minorVersion = version_string[len(majorVersion) :].replace('.', '')
223 
224  version = float(majorVersion + '.' + minorVersion)
225  return version
226 
227  def _initial_parse(self, response) -> object:
228  """
229  The first time parsing the result of a REST request.
230  It is checked for known errors.
231  @param response: The JSON response of the Moodle system
232  @return: The parsed JSON object
233  """
234 
235  self._check_response_code_check_response_code(response)
236 
237  # Try to parse the JSON
238  try:
239  response_extracted = response.json()
240  #print(response_extracted)
241  except Exception as error:
242  raise RuntimeError(
243  'An Unexpected Error occurred while trying'
244  + ' to parse the json response! Moodle'
245  + ' response: %s.\nError: %s' % (response.read(), error)
246  )
247  # Check for known errors
248  if 'error' in response_extracted:
249  error = response_extracted.get('error', '')
250  errorcode = response_extracted.get('errorcode', '')
251  stacktrace = response_extracted.get('stacktrace', '')
252  debuginfo = response_extracted.get('debuginfo', '')
253  reproductionlink = response_extracted.get('reproductionlink', '')
254  raise RequestRejectedError(
255  'The Moodle System rejected the Request.'
256  + (' Details: %s (Errorcode: %s, ' % (error, errorcode))
257  + ('Stacktrace: %s, Debuginfo: %s, Reproductionlink: %s)' % (stacktrace, debuginfo, reproductionlink))
258  )
259 
260  if 'exception' in response_extracted:
261  exception = response_extracted.get('exception', '')
262  errorcode = response_extracted.get('errorcode', '')
263  message = response_extracted.get('message', '')
264 
265  raise RequestRejectedError(
266  'The Moodle System rejected the Request.'
267  + ' Details: %s (Errorcode: %s, Message: %s)' % (exception, errorcode, message)
268  )
269 
270  return response_extracted
271 
272  @staticmethod
274  """URL-encode a multidimensional dictionary.
275  @param data: the data to be encoded
276  @returns: the url encoded data
277  """
278 
279  def recursion(data, base=[]):
280  pairs = []
281 
282  for key, value in data.items():
283  new_base = base + [key]
284  if hasattr(value, 'values'):
285  pairs += recursion(value, new_base)
286  else:
287  new_pair = None
288  if len(new_base) > 1:
289  first = urllib.parse.quote(new_base.pop(0))
290  rest = map(lambda x: urllib.parse.quote(x), new_base)
291  new_pair = '%s[%s]=%s' % (first, ']['.join(rest), urllib.parse.quote(str(value)))
292  else:
293  new_pair = '%s=%s' % (urllib.parse.quote(str(key)), urllib.parse.quote(str(value)))
294  pairs.append(new_pair)
295  return pairs
296 
297  return '&'.join(recursion(data))
298 
299 def obtain_login_token(
300  username: str, password: str, moodle_domain: str, moodle_path: str = '/', skip_cert_verify: bool = False
301 ) -> str:
302  """
303  Send the login credentials to the Moodle-System and extracts the resulting Login-Token.
304  @params: The necessary parameters to create a Token.
305  @return: The received token.
306  """
307  login_data = {'username': username, 'password': password, 'service': 'moodle_mobile_app'}
308 
309  obj = RequestHelper(moodle_domain, moodle_path, skip_cert_verify=skip_cert_verify)
310  response = obj.get_login(login_data)
311 
312  if 'token' not in response:
313  # = we didn't get an error page (checked by the RequestHelper) but
314  # somehow we don't have the needed token
315  raise RuntimeError('Invalid response received from the Moodle System! No token was received.')
316 
317  if 'privatetoken' not in response:
318  return response.get('token', ''), None
319  else:
320  return response.get('token', ''), response.get('privatetoken', ''), obj
321 
322 def _split_moodle_uri(moodle_uri: str):
323 
324  moodle_domain = moodle_uri.netloc
325  moodle_path = moodle_uri.path
326  if not moodle_path.endswith('/'):
327  moodle_path = moodle_path + '/'
328 
329  if moodle_path == '':
330  moodle_path = '/'
331 
332  return moodle_domain, moodle_path
333 
334 
335 def worker(moodle_token):
336  #moodle_token, moodle_privatetoken, obj = interactively_acquire_token()
337  #moodle_uri = "https://moodle.iitb.ac.in"
338  #moodle_uri = urlparse(moodle_uri)
339  #moodle_domain, moodle_path = _split_moodle_uri(moodle_uri)
340  request_helper = RequestHelper(moodle_domain, moodle_path, moodle_token, False)
341  return request_helper
This is Class for Creating and formatting requests made to Moodle API functions.
str _get_POST_DATA(str function, str token, str data_obj)
object get_login(self, {str:str} data)
str _get_REST_POST_URL(str url_base, str function)
object _initial_parse(self, response)
object post_REST(self, str function, {str:str} data=None)
def get_URL(self, str url, str cookie_jar_path=None)
def post_URL(self, str url, {str:str} data=None, str cookie_jar_path=None)