PyXMake Developer Guide 1.0
PyXMake
Loading...
Searching...
No Matches
Utility.py
1# -*- coding: utf-8 -*-
2# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
3# % Utility - Classes and Functions %
4# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
5"""
6Classes and functions defined for convenience.
7
8@note: PyXMake module
9Created on 15.07.2016
10
11@version: 1.0
12----------------------------------------------------------------------------------------------
13@requires:
14 -
15
16@change:
17 -
18
19@author: garb_ma [DLR-FA,STM Braunschweig]
20----------------------------------------------------------------------------------------------
21"""
22
23## @package PyXMake.Tools.Utility
24# Module of basic functions.
25## @author
26# Marc Garbade
27## @date
28# 15.07.2017
29## @par Notes/Changes
30# - Added documentation // mg 29.03.2018
31
32try:
33 ## Only meaningful < 3.12
34 from future import standard_library
35 # Deprecated since 3.12
36 standard_library.install_aliases()
37except:
38 pass
39
40try:
41 from builtins import str
42 from builtins import object
43except ImportError:
44 pass
45
46try:
47 FileNotFoundError
48except NameError:
49 FileNotFoundError = IOError # @ReservedAssignment
50
51import sys
52import os
53import io
54import six
55import ast
56import abc
57import site
58import json
59import shutil
60import shlex
61import logging
62import functools
63import subprocess
64import platform
65import stat
66import glob
67import time
68import struct
69import socket
70import tarfile
71import textwrap
72import posixpath, ntpath
73
74import numpy as np
75import random as rd
76import tempfile
77import zipfile
78import copy
79import urllib
80
81try:
82 import cPickle as cp # @UnusedImport
83except:
84 import pickle as cp # @Reimport
85
86try:
87 from contextlib import contextmanager # @UnusedImport
88except ImportError:
89 from contextlib2 import contextmanager # @Reimport
90
91from types import MethodType
92from .. import PyXMakePath
93
94try:
95 from PyXMake.Build import Make
96 autotest = Make.Coverage.add
97except:
98 def autotest(*args, **kwargs):
99 def decorator(func): return func
100 return decorator
101
102## Create an alias using default logger for all print statements
103logger = logging.getLogger(__name__)
104# setattr(sys.modules[__name__],"print", logger.info)
105
106## @class PyXMake.Tools.Utility.AbstractImport
107# Inherited from built-in object.
108class AbstractImport(object): # pragma: no cover
109 """
110 Abstract (lazy) import class to construct a module with attribute which is only really loaded into memory when first accessed.
111 It defaults to lazy import behavior.
112
113 @note: Derived from https://stackoverflow.com/questions/77319516/lazy-import-from-a-module-a-k-a-lazy-evaluation-of-variable
114 """
115 def __init__(self, *args, **kwargs):
116 """
117 Low-level initialization of parent class.
118 """
119 pass
120
121 def __new__(cls, name, package=None, **kwargs):
122 """
123 An approximate implementation of import.
124 """
125 import importlib
126
127 absolute_name = importlib.util.resolve_name(name, package)
128
129 try: return sys.modules[absolute_name]
130 except KeyError: pass
131
132 path = None
133 if '.' in absolute_name:
134 parent_name, _, child_name = absolute_name.rpartition('.')
135 parent_module = importlib.import_module(parent_name)
136 path = parent_module.__spec__.submodule_search_locations
137 for finder in sys.meta_path:
138 spec = finder.find_spec(absolute_name, path)
139 if spec is not None:
140 break
141 else:
142 msg = 'No module named %s' % absolute_name
143 raise ModuleNotFoundError(msg, name=absolute_name)
144
145 if kwargs.get("lazy_import",True):
146 loader = importlib.util.LazyLoader(spec.loader)
147 spec.loader = loader
148
149 module = importlib.util.module_from_spec(spec)
150 sys.modules[absolute_name] = module
151 spec.loader.exec_module(module)
152
153 if path is not None:
154 setattr(parent_module, child_name, module)
155 return module
156
157## @class PyXMake.Tools.Utility.AbstractBase
158# Abstract meta class for all data class objects. Inherited from built-in ABCMeta & object.
159# Compatible with both Python 2.x and 3.x.
160@six.add_metaclass(abc.ABCMeta)
161class AbstractBase(object): # pragma: no cover
162 """
163 Parent class for all abstract base classes.
164 """
165 @abc.abstractmethod
166 def __init__(self, *args, **kwargs):
167 """
168 Low-level initialization of parent class.
169 """
170 pass
171
172 @classmethod
173 def __new__(cls, *args, **kwargs):
174 """
175 Check if the current base is an abstract base.
176 """
177 if cls.__bases__[-1].__name__ in [AbstractBase.__name__]: raise TypeError("Can't instantiate abstract base class %s." % cls.__name__)
178 try: return super(AbstractBase,cls).__new__(cls)
179 ## If any module in PyXMake is reloaded during runtime, simple executing super might fail.
180 # The following line solves this issue.
181 except TypeError: return super(ClassWalk(AbstractBase,cls),cls).__new__(cls)
182
183 @classmethod
184 def recover(cls, *args):
185 """
186 Recover a derived data class completely from its JSON or dictionary form.
187 """
188 class Base(object):
189 """
190 Subclass instance for initialization
191 """
192 def __init__(self, _dictionary):
193 """
194 Initialization of any class instance.
195 """
196 for k,v in _dictionary.items(): setattr(self, k, v)
197 # Read dictionary or JSON string
198 dictionary = args[0]
199 # Deal with JSON string
200 if not isinstance(dictionary,dict): dictionary = RecoverDictionaryfromPickling(json.loads(args[0]))
201 # Return a working class
202 return type(cls.__name__, (Base, cls), {})(dictionary)
203
204 @classmethod
205 def classify(cls, *args, **kwargs):
206 """
207 Serializes an arbitrary data class instantiation call. Returns the complete class as JSON.
208 """
209 realization = cls(*args)
210 if kwargs: # Support "blank" attributes
211 for x, y in kwargs.items(): getattr(realization, x)(*y if isinstance(y,list) else y)
212 if hasattr(realization,"create"): getattr(realization,"create")()
213 return realization.jsonify()
214
215 def jsonify(self):
216 """
217 Create a JSON representation of the current class
218 """
219 return self.__str__()
220
221 def update(self, **kwargs):
222 """
223 Update any given class attribute.
224 """
225 for k,v in kwargs.items(): setattr(self, k, v)
226 pass
227
228 def __repr__(self):
229 """
230 Returns a string representation of the current instance.
231 """
232 return str("%s.%s(%s)") % (type(self).__name__, self.recover.__name__, str(self))
233
234 def __str__(self):
235 """
236 Prepare an object for JSON (2to3 compatible). Returns a canonical data representation of the current instance.
237 """
238 return json.dumps(self, default=PrepareObjectforPickling)
239
240 def __getstate__(self):
241 """
242 Prepare the object for pickling (2to3 compatible)
243 """
244 _dictobj = PrepareObjectforPickling(self)
245 return _dictobj
246
247 def __setstate__(self, _dict):
248 """
249 Recover a dictionary from pickling (2to3 compatible)
250 """
251 _dictobj = RecoverDictionaryfromPickling(_dict)
252 self.__dict__.update(_dictobj)
253 pass
254
255 @staticmethod
256 def __getbase__(base,cls):
257 """
258 Recursively find the common ancestor in all bases for a given class and compare them with the supplied base.
259
260 @note: Returns None if no common ancestor can be found
261 """
262 return ClassWalk(base, cls)
263
264## @class PyXMake.Tools.Utility.AbstractMethod
265# Class to create 2to3 compatible pickling dictionary. Inherited from built-in object.
266class AbstractMethod(object):
267 """
268 Abstract method to construct an instance and class method with the same descriptor.
269
270 @note: Derived from https://stackoverflow.com/questions/2589690/creating-a-method-that-is-simultaneously-an-instance-and-class-method
271 """
272 def __init__(self, method):
273 """
274 Construct an instance method.
275 """
276 self.method = method
277
278 def __get__(self, obj=None, objtype=None): # pragma: no cover
279 """
280 Custom descriptor for this class. Returns method either as class or as an instance.
281 """
282 @functools.wraps(self.method)
283 def _wrapper(*args, **kwargs):
284 """
285 A wrapper calling the given method
286 """
287 if obj is not None: return self.method(obj, *args, **kwargs)
288 else: return self.method(objtype, *args, **kwargs)
289 return _wrapper
290
291## @class PyXMake.Tools.Utility.ChangedWorkingDirectory
292# Class to create 2to3 compatible pickling dictionary. Inherited from built-in object.
293class ChangedWorkingDirectory(object):
294 """
295 Context manager for temporarily changing the current working directory.
296
297 @author: Brian M. Hunt
298 """
299 def __init__(self, newPath):
300 self.newPath = os.path.expanduser(newPath)
301 # Create target directory & all intermediate directories if don't exists
302 if not os.path.exists(self.newPath) and self.newPath != os.getcwd():
303 print("==================================")
304 print("Creating a new scratch folder @: %s" % self.newPath)
305 print("This folder will not be deleted once the job is done!")
306 print("==================================")
307 os.makedirs(self.newPath)
308
309 def __enter__(self):
310 self.savedPath = os.getcwd()
311 os.chdir(self.newPath)
312
313 def __exit__(self, etype, value, traceback):
314 os.chdir(self.savedPath)
315
316## @class PyXMake.Tools.Utility.GetDataFromPickle
317# Class to create 2to3 compatible pickling dictionary. Inherited from built-in object.
318class GetDataFromPickle(object): # pragma: no cover
319 """
320 Class to convert an arbitrary pickle file (2.x & 3.x) into a readable
321 dictionary.
322 """
323 def __init__(self, FileName):
324 """
325 Get a dictionary from a *.cpd file.
326
327 @param: self, FileName
328 @type: self: object
329 @type: FileName: string
330 """
331 ## Dictionary for further processing.
332 self.Data = GetDataFromPickle.getDictfromFile(FileName)
333 os.remove(FileName)
334
335 @staticmethod
336 def getDictfromFile(FileName):
337 """
338 Open a *.cpd file and extract the dictionary stored within.
339
340 @param: FileName
341 @type: FileName: string
342 """
343 FileIn = open(FileName, "rb")
344 Dict = cp.load(FileIn)
345 FileIn.close()
346 return Dict
347
348## @class PyXMake.Tools.Utility.UpdateZIP
349# Class to create 2to3 compatible pickling dictionary. Inherited from built-in object.
350class UpdateZIP(object): # pragma: no cover
351 """
352 Context manager for update an existing ZIP folder
353
354 @author: Marc Garbade
355 """
356 # Empty ZIP archive binary string
357 __binary_empty_archive = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
358
359 @staticmethod
360 def create(filename):
361 """
362 Create a new compatible empty archive
363
364 @author: Marc Garbade
365 """
366 # Local import of UploadFile
367 from fastapi import UploadFile
368 # Create new archive and account for version change
369 try: archive = UploadFile(None,filename=filename)
370 except TypeError: archive =UploadFile(filename)
371 # Return the archive
372 return archive
373
374 def __init__(self, zipname, zipdata=None, outpath=os.getcwd(), exclude=[], update=True, **kwargs):
375 self.ZipName = zipname
376 try: self.ZipData = copy.deepcopy(zipdata)
377 except:
378 # Data is not empty, but contains complex objects where copy fails.
379 self.buffer = open(os.path.join(os.getcwd(), self.ZipName), 'wb+')
380 shutil.copyfileobj(zipdata, self.buffer)
381
382 # If no ZIP data has been given or ZIP data is empty.
383 if not zipdata or (hasattr(zipdata, "read") and zipdata.read() == b''):
384 self.ZipData = tempfile.SpooledTemporaryFile()
385 self.ZipData.write(self.__binary_empty_archive)
386 self.ZipData.seek(0)
387
388 # Collect content of the ZIP folder from input
389 if not os.path.exists(os.path.join(os.getcwd(), self.ZipName)):
390 self.buffer = open(os.path.join(os.getcwd(), self.ZipName), 'wb+')
391 shutil.copyfileobj(self.ZipData, self.buffer)
392 # At this point, buffer will always be set. Either by the exception of by the if-clause.
393 self.buffer.close();
394
395 # Initialize local variables
396 self.Output = io.BytesIO();
397 self.OutputPath = outpath
398 self.ExcludeFiles = exclude
399 self.IgnoreExtension = kwargs.get("ignore_extension",(".zip", ".obj"))
400
401 self.Update = update
402
403 def __enter__(self):
404 # Extract data in current workspace and to examine its content
405 with zipfile.ZipFile(str(self.ZipName)) as Input:
406 Input.extractall()
407 os.remove(self.ZipName)
408
409 # Do not copy input files back into the new zip folder.
410 if not self.Update:
411 self.ExcludeFiles.extend([f for f in os.listdir(os.getcwd()) if os.path.isfile(os.path.join(os.getcwd(), f))])
412
413 def __exit__(self, etype, value, traceback):
414 # Collect all newly created files and store them in a memory ZIP folder.
415 with zipfile.ZipFile(self.Output,"w", zipfile.ZIP_DEFLATED) as Patch:
416 cwd = os.getcwd()
417 for dirpath, _, filenames in os.walk(cwd):
418 for f in filenames:
419 filepath = os.path.join(dirpath,f)
420 arcpath = filepath.split(cwd)[-1]
421 # Add all result files to the zip folder. Ignore old zip and object files
422 if not f.endswith(self.IgnoreExtension) and f not in self.ExcludeFiles:
423 Patch.write(filepath, arcpath)
424
425 # Write content of memory ZIP folder to disk (for download). Everything else has been removed by now.
426 with open(os.path.join(self.OutputPath,self.ZipName), "wb") as f:
427 f.write(self.Output.getvalue())
428
429@contextmanager
430def TemporaryDirectory(default=None):
431 """
432 Create a temporary dictionary for use with the "with" statement. Its content is deleted after execution.
433
434 @param: default
435 @type: default: string
436 """
437 @contextmanager
438 def Changed(newdir, cleanup=lambda: True):
439 """
440 Local helper function to clean up the directory
441 """
442 prevdir = os.getcwd()
443 os.chdir(os.path.expanduser(newdir))
444 try:
445 yield
446 finally:
447 os.chdir(prevdir)
448 cleanup()
449 # Create a new temporary folder in default.
450 # Uses platform-dependent defaults when set to None.
451 dirpath = tempfile.mkdtemp(dir=default)
452 def cleanup():
453 try: shutil.rmtree(dirpath)
454 except: DeleteRedundantFolders(dirpath, ignore_readonly=True)
455 with Changed(dirpath, cleanup):
456 yield dirpath
457
458@contextmanager
459def TemporaryEnvironment(environ={}): # pragma: no cover
460 """
461 Temporarily set process environment variables.
462 """
463 old_environ = os.environ.copy()
464 os.environ.update(environ)
465 try: yield
466 finally:
467 os.environ.clear()
468 os.environ.update(old_environ)
469
470@contextmanager
471def ConsoleRedirect(to=os.devnull, stdout=None): # pragma: no cover
472 """
473 Redirect console output to a given file.
474 """
475 def fileno(file_or_fd):
476 """
477 Small helper function to check the validity of the dump object.
478 """
479 fd = getattr(file_or_fd, 'fileno', lambda: file_or_fd)()
480 if not isinstance(fd, int):
481 raise ValueError("Expected a file (`.fileno()`) or a file descriptor")
482 return fd
483
484 def flush(stream):
485 """
486 Also flush c stdio buffers on python 3 (if possible)
487 """
488 try:
489 import ctypes
490 from ctypes.util import find_library
491 except ImportError:
492 libc = None
493 else:
494 try:
495 libc = ctypes.cdll.msvcrt # Windows
496 except OSError:
497 libc = ctypes.cdll.LoadLibrary(find_library('c'))
498 try:
499 # Flush output associated with C/C++
500 libc.fflush(ctypes.c_void_p.in_dll(libc, 'stdout'))
501 except (AttributeError, ValueError, IOError):
502 pass # unsupported
503
504 # Regular flush
505 stream.flush()
506 pass
507
508 if stdout is None:
509 stdout = sys.stdout
510
511 stdout_fd = fileno(stdout)
512 # copy stdout_fd before it is overwritten
513 #NOTE: `copied` is inheritable on Windows when duplicating a standard stream
514 with os.fdopen(os.dup(stdout_fd), stdout.mode) as copied:
515 flush(stdout) # flush library buffers that dup2 knows nothing about
516 try:
517 os.dup2(fileno(to), stdout_fd) # $ exec >&to
518 except ValueError: # filename
519 with open(to, 'wb') as to_file:
520 os.dup2(to_file.fileno(), stdout_fd) # $ exec > to
521 try:
522 yield stdout # allow code to be run with the redirected stdout
523 finally:
524 # restore stdout to its previous value
525 #NOTE: dup2 makes stdout_fd inheritable unconditionally
526 flush(stdout)
527 os.dup2(copied.fileno(), stdout_fd) # $ exec >&copied
528
529@contextmanager
530def MergedConsoleRedirect(f): # pragma: no cover
531 """
532 Redirect all console outputs to a given stream
533 """
534 with ConsoleRedirect(to=sys.stdout, stdout=sys.stdin) as inp, ConsoleRedirect(to=sys.stdout, stdout=sys.stderr) as err, ConsoleRedirect(to=f,stdout=sys.stdout) as out:
535 # $ exec 2>&1
536 yield (inp, err, out)
537
538@contextmanager
539def FileOutput(FileName): # pragma: no cover
540 """
541 Redirect outputs to a given file.
542 """
543 if sys.version_info >= (3,4):
544 import contextlib
545 # Python 3.4 and higher
546 try:
547 with open(FileName, 'w', encoding="utf-8") as f, contextlib.redirect_stdout(f), MergedConsoleRedirect(sys.stdout):
548 yield f
549 except:
550 # Python 3.6+ and in Docker container
551 with open(FileName, 'w', encoding="utf-8") as f, contextlib.redirect_stdout(f):
552 yield f
553 else:
554 # Lower version (unstable and deprecated)
555 with open(FileName, 'w', encoding="utf-8") as f, MergedConsoleRedirect(f):
556 yield f
557
558def FileUpload(url, filename, header={}, **kwargs): # pragma: no cover
559 """
560 Post a given file as a binary string to a given URL.
561
562 @note: LFS is available if request_toolbelt is installed.
563 """
564 import requests
565 # Procedure
566 try: r = requests.post(url, files = {kwargs.pop("kind",'file'): open(filename,'rb')}, headers=header, **kwargs)
567 except OverflowError:
568 try:
569 ## The file in question is to large. Attempt large-file support
570 from requests_toolbelt.multipart import encoder
571 session = requests.Session()
572 with open(filename, 'rb') as f:
573 form = encoder.MultipartEncoder({"documents": (filename, f, "application/octet-stream"),"composite": "NONE"})
574 header.update( {"Prefer": "respond-async", "Content-Type": form.content_type})
575 r = session.post(url, headers=header, data=form, **kwargs)
576 session.close()
577 # The necessary module is not installed. Skipping.
578 except ImportError: print("The given file is too large. Skipping.")
579 # We attempted uploading the file with large file support enabled, but failed.
580 except: print("Failed to upload %s." % filename)
581 # Return the response
582 return r
583
584@autotest("www.dlr.de")
586 """
587 Get all URLs present in a given string
588 """
589 import re
590
591 # Regular expression for URL
592 regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\‍(([^\s()<>]+|(\‍([^\s()<>]+\‍)))*\‍))+(?:\‍(([^\s()<>]+|(\‍([^\s()<>]+\‍)))*\‍)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
593 url = re.findall(regex,s)
594
595 # Fail safe URL return
596 if [x[0] for x in url]:
597 return [x[0] for x in url]
598 else:
599 try: # pragma: no cover
600 return s.split("href=")[1]
601 except:
602 return []
603
604@autotest()
606 """
607 Get the PyXMake path from *__init__.
608 """
609 Path = PyXMakePath
610 return Path
611
613 """
614 Get the underlying machine platform in lower cases.
615 """
616 return str(platform.system()).lower()
617
619 """
620 Get the underlying machine architecture. Returns either x86 or x64 which corresponds to
621 32 or 64 bit systems.
622 """
623 if struct.calcsize("P") * 8 == 64:
624 arch = 'x64'
625 else: # pragma: no cover
626 arch = 'x86'
627 return arch
628
629def GetLink(path): # pragma: no cover
630 """
631 Return the link target of a symbolic soft link
632
633 @note: Supports .lnk files from windows. Returns the target as well as all arguments
634 """
635 result = None
636 delimn = " "
637
638 # This option is only available on NT systems
639 if GetPlatform() in ["windows"] and PathLeaf(path).endswith(".lnk"):
640 # Local imports
641 try: import pywintypes #@UnusedImport
642 # Add shared library path explicitly
643 except ImportError: sys.path.append(os.path.join(site.getsitepackages()[-1],"pywin32_system32"))
644 finally:
645 import pywintypes #@UnusedImport @Reimport
646 import pythoncom #@UnusedImport
647 import win32com.client
648
649 shell = win32com.client.Dispatch("WScript.Shell")
650 shortcut = shell.CreateShortCut(path)
651 # Extract target and all arguments
652 result = delimn.join([shortcut.Targetpath,shortcut.Arguments])
653
654 # We have a symbolic soft link
655 if not result: result = os.readlink(path)
656
657 # Return a result.
658 return result
659
660def GetExecutable(name, get_path=False, **kwargs):
661 """
662 Check whether name is on PATH and marked as executable.
663
664 @author: Six
665 @note: https://stackoverflow.com/questions/11210104/check-if-a-program-exists-from-a-python-script
666 """
667 ## Overwrite default search path to always include the current directory
668 # Only use the current directory if PATH variable is not accessible
669 kwargs.update({"path": kwargs.get("path",
670 kwargs.pop("search_paths",os.pathsep.join([os.getenv("PATH",os.getcwd()),os.getcwd()]) if GetPlatform() != "windows" else None))})
671 try:
672 from whichcraft import which
673 except ImportError:
674 if sys.version_info >= (3, 3):
675 from shutil import which
676 else:
677 # This happens when executed with Python 3.2 and lower without witchcraft module
678 from distutils.spawn import find_executable
679 # Check if path to executable is requested
680 if not get_path:
681 # Return just a boolean expression
682 return find_executable(name, **kwargs) is not None
683 else:
684 # Return both path and executable
685 return find_executable(name, **kwargs) is not None, find_executable(name, **kwargs)
686 # We reach this point in all other cases
687 if not get_path:
688 # Return just a boolean expression
689 return which(name, **kwargs) is not None
690 else:
691 # Return both path and executable
692 return which(name, **kwargs) is not None, which(name, **kwargs)
693
694@autotest("false")
695def GetBoolean(v): # pragma: no cover
696 """
697 Inspect the input variable and return a Boolean value if conversion is possible.
698
699 @Original: https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse
700 """
701 import argparse
702 if isinstance(v, bool):
703 return v
704 if v.lower() in ('yes', 'true', 't', 'y', '1', "on"):
705 return True
706 elif v.lower() in ('no', 'false', 'f', 'n', '0', "off"):
707 return False
708 else:
709 raise argparse.ArgumentTypeError('Boolean value expected.')
710
711@autotest(autorun=False, stream=sys.stdout, reconfigure=True)
712def GetLogger(name=None, **kwargs): # pragma: no cover
713 """
714 Initialize a root logger if no settings prior to loading have been found.
715 Otherwise, inherit a logger from supported system logging utilities when available.
716 Finally, return a reference to the created or already created logger.
717 """
718 # Local variables
719 delimn = " "
720
721 log_name = name
722 log_format = kwargs.pop("format",None)
723 log_stream = kwargs.pop("stream",None)
724 log_level = kwargs.pop("level",logging.NOTSET)
725 log_overwrite = kwargs.pop("overwrite", log_name != None)
726
727 try:
728 if log_overwrite: log_level = logging.getLogger().handlers[0].level
729 except IndexError: pass
730
731 try:
732 from fa_pyutils.service.logger import MyLogger as getLogger
733 # Create a new logger from SY service module
734 logger = getLogger(log_name)
735 logger._setLogLevel(log_level)
736 # Always use system logger in development mode
737 if kwargs.pop("user",getattr(sys, "frozen", False)): raise ImportError
738 logger.handlers.clear() #@UndefinedVariable
739 handle = logging.StreamHandler()
740 # Update format
741 log_format = logger.formatter._fmt.split() #@UndefinedVariable
742 if log_name: log_format.insert(1, '%(name)s')
743 log_format = delimn.join(log_format)
744 # Update handle
745 handle.setFormatter(logging.Formatter(log_format))
746 except ImportError:
747 # Fail back to default logger
748 logger = logging.getLogger(__name__)
749 handle = logging.StreamHandler(log_stream)
750 finally:
751 # Set log level to highest available
752 logger.setLevel(log_level)
753 logger.addHandler(handle)
754 # Reconfigure logging globally
755 if kwargs.pop("reconfigure",len(logging.getLogger().handlers) < 1 and log_overwrite):
756 logging.basicConfig(level=log_level, format=log_format, stream=log_stream, **kwargs)
757 # Only update the logger once
758 if log_name and not log_name in logging.root.manager.loggerDict: #@UndefinedVariable
759 logging.root.manager.loggerDict.update({log_name:logger}) #@UndefinedVariable
760 # Return logger object
761 return logging.getLogger(log_name)
762
763def GetRequirements(dirname, args=[], check=False, **kwargs):
764 """
765 Create a list of required sub-packages for a given Python project (given as full directory path).
766
767 @author: garb_ma
768 """
769 import requests
770 import datetime
771 import pipreqs
772 # Some minor imports
773 from packaging import version
774 from tempfile import NamedTemporaryFile
775 from pipreqs.pipreqs import main as listreqs
776 # Create a temporary file for storage of binary output
777 tmp = NamedTemporaryFile(mode="r+", suffix=".txt", encoding="utf-8", delete=False).name
778 command = ["--force","--savepath", tmp, "--encoding","utf-8"]
779 # Assemble build command
780 command.extend(args); command.append(dirname)
781 # New input format style. Keep backwards compatibility.
782 if version.parse(pipreqs.__version__) >= version.parse("0.4.11") and "--no-pin" in command:
783 command.remove("--no-pin"); command = ["--mode", "no-pin"] + command ;
784 # Execute command and restore system variables.
785 restored = copy.deepcopy(sys.argv); sys.argv = sys.argv[0:1] + command; listreqs(); sys.argv = restored
786 # Get the file as URL (just for cross-validation)
787 target_url = 'file:' + urllib.request.pathname2url(tmp)
788 # Store content as a comma-separated list.
789 data = urllib.request.urlopen(target_url).read().decode('utf-8').replace(os.linesep," ").split()
790 # Remove all non-existing packages
791 if check:
792 try:
793 from stdlib_list import stdlib_list
794 # Check if standard library module is available for search
795 data = [str(item) for item in data if not any([x.startswith(str(item)) for x in stdlib_list()])]
796 except ImportError: pass
797 # Explicit version check if pinned
798 responses = [(item, requests.get("https://pypi.python.org/pypi/%s/json" % str(item).split("==")[0])) for item in data]
799 try: data = [str(item) for item, response in responses if str(item).split("==")[1] in response.json()["releases"] ]
800 except: data = [str(item) for item, response in responses if response.status_code == 200]
801 # Check for heavily dated packages
802 for item in data:
803 # Check if the last update is older than 5 years (default). Can be used to fine-tune the results and removes heavily out-dated packages
804 response = dict(requests.get("https://pypi.python.org/pypi/%s/json" % str(item).split("==")[0]).json())
805 try: diff = abs(int(datetime.date.today().year) - int(response["releases"][response["info"]["version"]][0]["upload_time_iso_8601"].split("-")[0]))
806 except:
807 ## There is something wrong if the found match.
808 # Remove the entry. It either cannot be resolved correctly using PyPi or a false positive from PyReq was found locally.
809 data.remove(item); continue
810 if diff >= int(kwargs.get("check_updated", 5)): data.remove(item)
811 # Remove temporary folder (if existing!)
812 try:
813 os.remove(tmp)
814 except FileNotFoundError:
815 pass
816 # Return the data as a list
817 return data
818
819@autotest()
821 """
822 Return the host IP address (IP address of the machine executing this code)
823 """
824 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
825 try:
826 # Doesn't even have to be reachable
827 s.connect(('10.255.255.255', 1))
828 IP = s.getsockname()[0]
829 except Exception:
830 IP = '127.0.0.1'
831 finally:
832 s.close()
833 return IP
834
835@autotest()
837 """
838 Return the WSL IP address (IP address of the machine executing this code)
839 """
840 def hasWSL():
841 """
842 Heuristic to detect if Windows Subsystem for Linux is available.
843
844 @source: https://www.scivision.dev/python-detect-wsl/
845 """
846 if os.name == "nt": # pragma: no cover
847 wsl = shutil.which("wsl")
848 if not wsl:
849 return False
850 ret = subprocess.run(["wsl", "test", "-f", "/etc/os-release"])
851 return ret.returncode == 0
852 return False
853
854 # Return WSL IP address or none is not available (Linux, macOS)
855 if hasWSL(): return subprocess.check_output(["bash","-c","ifconfig eth0 | grep 'inet '"]).decode().split("inet ")[-1].split(" netmask")[0]
856 else: return None
857
858@autotest(r'"data"')
859def GetSanitizedDataFromCommand(*args, **kwargs):
860 """
861 Returns platform independent paths extracted from a given command or list
862 """
863 def sanitize(expression, **kwargs):
864 """
865 Validate a given pair
866 """
867 # Paths should be given in double quotes to accept spaces
868 try: result = ArbitraryEval(expression)
869 # Parsed strings contains single quotes only
870 except kwargs.get("allowed_exceptions", (ValueError, SyntaxError)) as _:
871 result = expression # pragma: no cover
872 # Return the result in both cases
873 return result
874 # Always create a list from the given input
875 data = list(ArbitraryFlattening([args]))
876 data = [os.path.normpath(sanitize(x,**kwargs)) for x in data]
877 # Treat input as paths. Get system independent format
878 if kwargs.get("is_path",True):
879 # Check if paths are given relative or absolute. Rewrite to absolute in any case
880 result = [os.path.abspath(x) if os.path.exists(os.path.abspath(x))
881 else os.path.abspath(os.path.join(os.getcwd(),x)) for x in data]
882 else: result = data # pragma: no cover
883 # Return list of rewritten paths
884 return result
885
886@autotest("echo")
888 """
889 Returns the active environment from a process after a given command is executed.
890 """
891 # Local function definitions
892 def validate_pair(ob): # pragma: no cover
893 """
894 Validate a given pair
895 """
896 try:
897 if not (len(ob) == 2):
898 raise RuntimeError("Unexpected result: %s" % ob)
899 except: return False
900 return True
901
902 def consume(iterable):
903 """
904 Iterate over a given stream of inputs
905 """
906 try:
907 while True: next(iterable)
908 except StopIteration: pass
909 pass
910
911 # Local imports
912 import itertools
913
914 # Begin of function body
915 delimn = " %s " % "&&" if GetPlatform() in ["windows"] else ";"
916 printenv = "set" if GetPlatform() in ["windows"] else "printenv"
917 tag = 'Done running command'
918 # construct a cmd.exe command to do accomplish this
919 command = delimn.join([command,'echo "%s"' % tag,printenv])
920 # Execute the command
921 proc = Popen(command, raw=True)
922 # parse the output sent to standard output
923 lines = proc.stdout
924 # Consume whatever output occurs until the tag is reached
925 consume(itertools.takewhile(lambda l: tag not in l, lines))
926 # Define a way to handle each KEY=VALUE line
927 handle_line = lambda l: l.rstrip().split('=',1)
928 # Parse key/values into pairs and validate them
929 pairs = map(handle_line, lines)
930 valid_pairs = filter(validate_pair, pairs)
931 # Create a dictionary of valid pairs
932 result = dict(valid_pairs)
933 # Finish the process
934 proc.communicate()
935 # Return results
936 return result
937
938@autotest()
940 """
941 Return the local DNS IP address.
942 """
943 import dns.resolver #@UnresolvedImport
944 dns_resolver = dns.resolver.Resolver()
945 # Return result
946 return dns_resolver.nameservers
947
948@autotest(default = True)
949def GetOpenAPIGenerator(output=None, **kwargs):
950 """
951 Get the latest version of OpenAPI generator (requires Internet connection).
952
953 @author: garb_ma
954 """
955 # Local imports
956 import requests
957 import urllib.request
958
959 from lxml import html
960 from packaging.version import Version, parse
961
962 # Default output variable
963 result = None
964 # Define output directory. Defaults to PyXMake sub folder
965 if not output: path = os.path.join(PyXMakePath,"Build","bin","openapi")
966 else: path = output # pragma: no cover
967 # Check if the path already exists or is empty. Install if no executable is found
968 if not os.path.exists(path) or not os.listdir(path) and kwargs.get("silent_install",True):
969 # This is the base download path
970 base = "https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli"
971 try:
972 # Attempt to fetch latest version. Requires an active connection
973 page = requests.get(base)
974 page.raise_for_status()
975 webpage = html.fromstring(page.content)
976 # This is the latest version
977 version = sorted([x[:-1] for x in webpage.xpath('//a/@href') if str(x[:-1][0]).isdigit() and isinstance(parse(x[:-1]),Version)], reverse=True)[0]
978 # Allow users to define a version
979 url = posixpath.join(base,version,"openapi-generator-cli-%s.jar" % kwargs.get("version", version ) )
980 # Create full output path
981 os.makedirs(path, exist_ok=True)
982 # Download the executable
983 urllib.request.urlretrieve(url,os.path.join(path,PathLeaf(url)))
984 # Something went wrong while processing the correct version
985 except: raise ConnectionError("Downloading latest OpenAPI generator client failed.")
986 # The latest executable within the folder is returned
987 if os.listdir(path): result = os.path.join(path,sorted([x for x in os.listdir(path)], reverse=True)[0])
988 # Return None or the absolute path to the client
989 return result
990
991@autotest("DLR")
992def GetBibliographyData(fulltext, verbose=0):
993 """
994 Perform a full text search using Googles' book API.
995 """
996 def get(url):
997 """
998 Send a get command.
999
1000 @note: Uses requests when available; falls back to urllib if requests is not found.
1001 """
1002 try: import requests
1003 except ImportError: import urllib.request as requests # pragma: no cover
1004
1005 # Get preliminary information from API
1006 try: # pragma: no cover
1007 # This only works when requests is a local alias to urllib.request
1008 with requests.urlopen(base_api_link) as f: text = f.read()
1009 result = text.decode("utf-8")
1010 # This should be the default case for modern systems
1011 except AttributeError: result = requests.get(base_api_link).text #@UndefinedVariable
1012 # Return result
1013 return result
1014
1015 # Immutable link to Google API
1016 google_api_search = posixpath.join("https:",""," ","www.googleapis.com","books","v1","volumes")
1017 base_api_link = posixpath.join(google_api_search,"?q=").replace(" ","")+urllib.parse.quote(str(fulltext))
1018
1019 # Get preliminary information from API
1020 decoded_text = get(base_api_link)
1021
1022 try:
1023 obj = json.loads(decoded_text) # deserializes decoded_text to a Python object
1024 # Fetch full JSON information from volume ID
1025 base_api_link = posixpath.join(google_api_search,obj["items"][0]["id"]).replace(" ","")
1026
1027 # Get extended information from the API
1028 decoded_text = get(base_api_link)
1029 JSON = json.loads(decoded_text) # deserializes decoded_text to a Python object
1030 except:
1031 if verbose >= 1: print("No matching entry found")
1032 JSON = dict()
1033 pass
1034
1035 # Return serialized object
1036 return JSON
1037
1038@autotest(arg=0)
1039def GetTemporaryFileName(arg=None, filename="Temp", extension=".cpd", **kwargs):
1040 """
1041 Create a temporary file name with extension *.cpd by default. Optional argument: Seed for random number generation.
1042 """
1043 if isinstance(arg, six.integer_types):
1044 _seed = arg
1045 rd.seed(_seed)
1046 randTempInteger = rd.randint(1,1000)
1047 # Added backwards compatibility
1048 TempFileName = filename + str(randTempInteger) + kwargs.get("ending",extension)
1049 else:
1050 rd.seed()
1051 randTempInteger = rd.randint(1,1000)
1052 # Added backwards compatibility
1053 TempFileName = filename + str(randTempInteger) + kwargs.get("ending",extension)
1054 return TempFileName
1055
1056@autotest("'%s'" % site.getusersitepackages())
1057def GetPathConversion(path, target_os=None, **kwargs): # pragma: no cover
1058 """
1059 Return the given absolute path in its Linux/Windows counter part.
1060 """
1061 current_os = target_os
1062 if current_os == None: current_os = GetPlatform()
1063 # Check if path is given in quotations
1064 if path[0] == path[-1] and path[0] in ["'",'"']:
1065 quote = path[0]
1066 path = path[1:-1]
1067 else: quote = None
1068 ## Fetch the requested target platform
1069 if current_os.lower() == "linux":
1070 target = posixpath; pristine = ntpath
1071 else:
1072 target = ntpath; pristine = posixpath
1073
1074 if os.path.splitdrive(path)[0]:
1075 # Input path is Windows style
1076 converted_path = target.join(AsDrive(os.path.splitdrive(path)[0][0], sep=target.sep), *os.path.splitdrive(path)[-1].replace(pristine.sep, target.sep).split(target.sep)[1:])
1077 elif current_os.lower != GetPlatform():
1078 converted_path = ntpath.abspath(path)
1079 # Treat first entry of Linux path as a Windows drive. Defaults to True.
1080 if len(path.split(pristine.sep)) ==1: pristine = target
1081 if kwargs.get("use_linux_drive",True): converted_path = target.join(AsDrive(path.split(pristine.sep)[1:][0], sep=target.sep),*path.split(pristine.sep)[2:])
1082 else:
1083 # Copy path to output
1084 converted_path = path
1085 # Apply all quotations
1086 if quote:
1087 path = quote + path + quote
1088 converted_path = quote + converted_path + quote
1089 return converted_path
1090
1091@autotest("PyXMake")
1092def GetIterableAsList(Iterable):
1093 """
1094 Walk through an iterable input set and store the results in a list.
1095 """
1096 AsList = []
1097 for i in range(0,len(Iterable)):
1098 AsList.append(Iterable[i])
1099 return AsList
1100
1101def GetMergedRepositories(RepoID, RepoList, RepoBranch= {}, MergeBranch = [], scratch=os.getcwd(), **kwargs): # pragma: no cover
1102 """
1103 Walk through an iterable input set and store the results in a list.
1104 """
1105 try:
1106 import git
1107 import git_filter_repo
1108 except:
1109 raise NotImplementedError
1110
1111 url_delimn = "/"
1112 master = "master"
1113
1114 repo_branch = RepoBranch
1115 if not MergeBranch: merge_branch = ["master" for _ in RepoList]
1116
1117 # Check if arrays have the same size. Return an array if that is not the case.
1118 if len(merge_branch) != len(RepoList):
1119 raise NotImplementedError
1120
1121 # Create an iterator.
1122 merge_branch = iter(merge_branch)
1123
1124 # Operate fully in a temporary directory
1125 with ChangedWorkingDirectory(scratch):
1126 if not os.path.exists(RepoID):
1127 # Initialize a new repository named RepoID
1128 g = git.Repo.init(RepoID) #@UndefinedVariable
1129 else:
1130 # Update existing repository
1131 g = git.Repo(RepoID)
1132 # Iterate through all given repositories
1133 for repo in RepoList:
1134 add_repo = repo[0]; add_package = repo[1]
1135 # Check if an additional sub-folder name has been given
1136 try: add_subfolder = repo[2];
1137 except: add_subfolder = ""
1138
1139 # Iterate through given branches. Defaults to master branch if not given otherwise.
1140 add_branch = next(merge_branch)
1141
1142 # Clone the given repositories
1143 _ = git.Repo.clone_from(add_repo, add_package) #@UndefinedVariable
1144 if add_subfolder:
1145 # Create a new sub-folder with the content of the repository. Only if a sub-folder was given.
1146 command = " ".join([sys.executable,git_filter_repo.__file__,"--to-subdirectory-filter",add_subfolder])
1147 _.git.execute(shlex.split(command,posix=not os.name.lower() in ["nt"]))
1148
1149 # Merge GIT repositories - allowing unrelated histories.
1150 g.git.remote("add",add_package, "../"+add_package)
1151 g.git.fetch(add_package,"--tags")
1152 g.git.merge("--allow-unrelated-histories",url_delimn.join([add_package,add_branch]))
1153 g.git.remote("remove",add_package)
1154
1155 # Again, avoid race condition. If it is still happening, retry again until success.
1156 while True:
1157 time.sleep(1)
1158 try: DeleteRedundantFolders(add_package, ignore_readonly=True); break
1159 except: pass
1160
1161 # Create additional branches with the content of the original repositories
1162 for repo in RepoList:
1163 # Check if a branch directory was given.
1164 if not repo_branch: break
1165 try: add_subfolder = repo[2];
1166 except: add_subfolder = ""
1167
1168 g.git.switch(master)
1169
1170 if add_subfolder:
1171 g.git.checkout(master, b=repo_branch[add_subfolder])
1172 command = " ".join([sys.executable,git_filter_repo.__file__,"--force","--subdirectory-filter",add_subfolder+"/","--refs",repo_branch[add_subfolder]])
1173 g.git.execute(shlex.split(command,posix=not os.name.lower() in ["nt"]))
1174
1175 g.git.switch(master)
1176
1177 if not kwargs.get("keep_subfolder",True):
1178 for repo in RepoList:
1179 # Check if a branch directory was given.
1180 if not repo_branch: break
1181 try: add_subfolder = repo[2];
1182 except: break
1183 DeleteRedundantFolders(os.path.join(os.path.abspath(os.getcwd()),RepoID,add_subfolder), ignore_readonly=True)
1184 try:
1185 g.git.add(".")
1186 g.git.commit("-m", "Added branches")
1187 except: pass
1188
1189 # Return error code
1190 return 0
1191
1192def GetDockerContainer(container, encoding="utf-8"): # pragma: no cover
1193 """
1194 Check if a current given container is active and running. Returns boolean value.
1195
1196 @author: garb_ma
1197 @param: container
1198 @type: string
1199 """
1200 # Check if executable exists on the current machine
1201 if not GetExecutable("docker"):
1202 print("==================================")
1203 print("docker-compose not found. Please install Docker")
1204 print("==================================")
1205 return -1
1206
1207 command = " ".join(["docker","ps","--filter", "name="+container])
1208 p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=not IsDockerContainer())
1209 logs, _ = p.communicate()
1210 ## Extract result string. Output ends with "NAMES" if no container is found. Otherwise, a string containing information
1211 # about the running container is appended. Since only a boolean value is required, the check for names is sufficient.
1212 return not logs.decode(encoding,errors='ignore').replace('\n', '').endswith("NAMES")
1213
1214def GetDockerRegistry(registry, port, https_keypath="", https_pemfile="", GUI=False, **kwargs): # pragma: no cover
1215 """
1216 Check if a current given registry is active and running. If not, start a local docker registry at the given port and name.
1217 Optionally, secure connection by a HTTPS certificate (PEM) and alter the base image
1218
1219 @author: garb_ma
1220 @param: registry
1221 @type: string
1222 """
1223 # Check if executable exists on the current machine
1224 if not GetExecutable("docker"):
1225 print("==================================")
1226 print("Docker executable not found. Please install Docker")
1227 print("==================================")
1228 return -1
1229
1230 # Check if the given container is already active and running
1231 if GetDockerContainer(registry):
1232 print("==================================")
1233 print("Container is already running")
1234 print("==================================")
1235 return -1
1236
1237 # Assemble base command
1238 command=["docker","run","-d","--restart=always","--name",str(registry),"-e","REGISTRY_HTTP_ADDR=0.0.0.0:"+str(port),"-p",":".join([str(port)]*2)]
1239 # Check if HTTPS support should be enabled.
1240 if all([https_keypath, https_pemfile]):
1241 command.extend(["-v",str(https_keypath)+":/certs","-e","REGISTRY_HTTP_TLS_CERTIFICATE=/certs/"+str(https_pemfile),
1242 "-e","REGISTRY_HTTP_TLS_KEY=/certs/"+str(https_pemfile)])
1243 # Provide base image name (from which to spawn the registry
1244 command.extend([kwargs.get("registry_base","registry:latest")])
1245 # Execute command
1246 p = subprocess.check_call(command, shell=True)
1247
1248 if GUI:
1249 # Assemble GUI command
1250 command = ["docker","run","-d","--restart=always","--name",str(registry)+"_ui", "-p",kwargs.get("ui_port",str(int(str(port))+50))+":80"]
1251 command.extend(["-e","REGISTRY_HOST="+str(GetHostIPAddress),"-e","REGISTRY_PORT="+str(port)])
1252 # Check HTTPS support
1253 if all([https_keypath, https_pemfile]): command.extend(["-e","REGISTRY_PROTOCOL=https"])
1254 #Finalize command
1255 command.extend(["-e","SSL_VERIFY=false", "-e","ALLOW_REGISTRY_LOGIN=true","-e","REGISTRY_ALLOW_DELETE=true",
1256 "-e","REGISTRY_PUBLIC_URL="+":".join([str(socket.gethostname()),str(port)]),kwargs.get("ui_base","parabuzzle/craneoperator:latest")])
1257 # Run with GUI support
1258 subprocess.check_call(command, shell=True)
1259
1260 # Return error code
1261 return p
1262
1263def GetDockerUI(name="portainer", image="portainer/portainer-ce:latest"): # pragma: no cover
1264 """
1265 Create a custom web UI for docker using Portainer.
1266
1267 @author: garb_ma
1268 @param: name
1269 @type: string
1270 """
1271 # Check if executable exists on the current machine
1272 if not GetExecutable("docker"):
1273 print("==================================")
1274 print("Docker executable not found. Please install Docker")
1275 print("==================================")
1276 return -1
1277
1278 # Check if the given container is already active and running
1279 if GetDockerContainer(name):
1280 print("==================================")
1281 print("Container is already running")
1282 print("==================================")
1283 return -1
1284
1285 # Assemble base command
1286 command = ["docker","run","-d"]
1287 command.extend(["--name=%s" % name , "--restart=always"])
1288 command.extend(["-p","8000:8000","-p","9000:9000","-p","9443:9443"])
1289 command.extend(["-v","/var/run/docker.sock:/var/run/docker.sock","-v","portainer_data:/data", image])
1290 # command.extend([-v cert:/certs --tlsskipverify --tlscert /certs/jenkins.pem --tlskey /certs/jenkins.pem]
1291 subprocess.check_call(command, shell=not GetPlatform() in ["linux"])
1292 # Return success code
1293 return 0
1294
1295def GetDockerRunner(runner="gitlab-runner",restart_policy="always", image="",token="", executor="shell",
1296 url='https://gitlab.dlr.de/', tags="docker,linux", ssl_verify=True, **kwargs): # pragma: no cover
1297 """
1298 Check if a current given registry is active and running. If not, start a local docker registry at the given port and name.
1299 Optionally, secure connection by a HTTPS certificate (PEM) and alter the base image
1300
1301 @author: garb_ma
1302 @param: registry
1303 @type: string
1304 """
1305 # Return GitLab Runner installation script (Linux only).
1306 if kwargs.get("as_script","") and os.path.exists(os.path.dirname(kwargs.get("as_script",""))):
1307 script = '''\
1308 #!/bin/sh
1309 # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1310 # % Shell script for Docker/Linux (x64) %
1311 # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1312 # Shell script for creating a GitLab runner programmatically
1313 # Created on 22.04.2022
1314 #
1315 # Version: 1.0
1316 # -------------------------------------------------------------------------------------------------
1317 # Requirements and dependencies:
1318 # -
1319 #
1320 # Changelog:
1321 # - Created // mg 22.04.2022
1322 #
1323 # -------------------------------------------------------------------------------------------------
1324 # Process through all command line arguments (if given).
1325 for i in "$@"
1326 do
1327 case $i in
1328 -t=*|--token=*)
1329 token="${i#*=}"
1330 shift
1331 ;;
1332 -u=*|--url=*)
1333 url="${i#*=}"
1334 shift
1335 ;;
1336 -i=*|--image=*)
1337 image="${i#*=}"
1338 shift
1339 ;;
1340 -d=*|--description=*)
1341 description="${i#*=}"
1342 shift
1343 ;;
1344 -e=*|--executor=*)
1345 executor="${i#*=}"
1346 shift
1347 ;;
1348 -b=*|--base=*)
1349 base="${i#*=}"
1350 shift
1351 ;;
1352 -l=*|--locked=*)
1353 locked="${i#*=}"
1354 shift
1355 ;;
1356 -s=*|--system=*)
1357 return="${i#*=}"
1358 shift
1359 ;;
1360 -r=*|--return=*)
1361 return="${i#*=}"
1362 shift
1363 ;;
1364 *) # Unknown option
1365 ;;
1366 esac
1367 done
1368 # Evaluate user settings
1369 token=${token:-""}
1370 url=${url:-"https://gitlab.dlr.de/"}
1371 system=${system:-"local"}
1372 image=${image:-"harbor.fa-services.intra.dlr.de/dockerhub/gitlab/gitlab-runner:latest"}
1373 executor=${executor:-"docker"}
1374 base=${base:-"alpine:latest"}
1375 description=${description:-"fa-docker"}
1376 locked=${locked:-"false"}
1377 return=${return:-"true"}
1378 # Procedure
1379 command=$( echo "docker run --rm -v gitlab-runner-config:/etc/gitlab-runner ${image}" )
1380 if [ "$system" != "dind" ]; then
1381 download="https://packages.gitlab.com/install/repositories/runner/gitlab-runner"
1382 command=$( echo "gitlab-runner" )
1383 # Debian, Ubuntu and Mint
1384 ( command -v apt >/dev/null 2>&1 ; ) && ( curl -L "$download/script.deb.sh" | bash ) && ( apt-get install gitlab-runner ) ;
1385 # RHEL, CentOS and Fedora
1386 ( command -v yum >/dev/null 2>&1 ; ) && ( curl -L "$download/script.rpm.sh" | bash ) && ( yum install gitlab-runner ) ;
1387 else
1388 # Docker in Docker
1389 docker volume create gitlab-runner-config
1390 docker run -d --name gitlab-runner --restart always -v gitlab-runner-config:/etc/gitlab-runner -v /var/run/docker.sock:/var/run/docker.sock --env TZ=DE -i ${image}
1391 fi
1392 $command register --non-interactive --executor ${executor} --docker-image ${base} --url ${url} --registration-token ${token} --description ${description} --tag-list ${executor} --run-untagged=true --locked=${locked} --access-level=not_protected --docker-privileged --docker-volumes /certs/client
1393 # Create a cron job deleting all leftover data in regular intervals.
1394 echo 'docker system prune --all --volumes --force' > cron.sh
1395 # Resume to main shell. Deactivate in Docker to keep the container running endlessly.
1396 if [ "$return" = "false" ]; then tail -f /dev/null; fi
1397 exit 0'''
1398 path = kwargs.get("as_script")
1399 with open(path,"w") as f: f.write(textwrap.dedent(script))
1400 print("==================================")
1401 print("Writing GitLab Runner Runtime script to: ")
1402 print("%s " % path)
1403 print("==================================")
1404 return os.path.abspath(path)
1405
1406 # Check if executable exists on the current machine
1407 if not GetExecutable("docker"):
1408 print("==================================")
1409 print("Docker executable not found. Please install Docker")
1410 print("==================================")
1411 return -1
1412
1413 # Check if the given container is already active and running
1414 if not GetDockerContainer(runner):
1415 # Expose docker host socket to runner - add additional / to host path if started from Windows.
1416 socket = "/var/run/docker.sock:/var/run/docker.sock"
1417 if GetPlatform() == "windows": socket = socket[0] + socket
1418 # Create a local docker volume to store all relevant information
1419 subprocess.check_call(["docker","volume","create",runner+"-config"], shell=not GetPlatform() in ["linux"])
1420 # Assemble command
1421 command = ["docker","run","-d","--name",runner,"--restart",restart_policy,"-v",runner+"-config"+":/etc/gitlab-runner","-v",socket,"--env","TZ=DE",
1422 "-i","gitlab/gitlab-runner:latest"]
1423 # Install a GitLab runner into current docker environment
1424 subprocess.check_call(command, shell=not GetPlatform() in ["linux"])
1425 if image:
1426 subprocess.check_call(["docker","tag","gitlab/gitlab-runner:latest",image], shell=not GetPlatform() in ["linux"])
1427 subprocess.check_call(["docker","image","rm","gitlab/gitlab-runner:latest"], shell=not GetPlatform() in ["linux"])
1428 _run_image = image
1429 else:
1430 _run_image="gitlab/gitlab-runner:latest"
1431 if token:
1432 # If token is given, additionally establish connection with given parameters
1433 command = ['docker','run','--rm','-v',runner+"-config"+":/etc/gitlab-runner",_run_image,'register','--non-interactive','--executor',executor,'--docker-image','alpine:latest',
1434 '--url',url,'--registration-token',token,'--description',str(runner),'--tag-list',tags,'--run-untagged=true','--locked=false','--access-level=not_protected',
1435 '--docker-privileged']
1436 # Only if SSL verification is active
1437 if ssl_verify: command.extend(["--docker-volumes","/certs/client"])
1438 subprocess.check_call(command, shell=not GetPlatform() in ["linux"])
1439 else:
1440 print("==================================")
1441 print("Container is already running")
1442 print("==================================")
1443
1444 return 0
1445
1446def GetDockerPorts(**kwargs): # pragma: no cover
1447 """
1448 Get all active Docker ports. Either from a given container, by a list of containers or all active containers.
1449 Defaults to all active containers. Supports WSL2 with Docker installed.
1450
1451 @author: garb_ma
1452 @param: container
1453 @type: string
1454 """
1455 # Check if executable exists on the current machine
1456 if not GetExecutable("docker"):
1457 print("==================================")
1458 print("Docker executable not found. Please install Docker")
1459 print("==================================")
1460 return -1
1461
1462 # Get docker's absolute execution path
1463 _, docker = GetExecutable("docker", get_path=True)
1464 if not docker.lower().endswith(".bat"): docker = "docker"
1465
1466 data = []; ports = [];
1467 # Collect all ports from given containers.
1468 active_container = kwargs.get("container"," ".join(subprocess.check_output((["sudo"] if IsDockerContainer() else []) + [docker,"ps","--quiet"]).decode("utf-8").split()).split("--quiet")[-1].split())
1469 if isinstance(active_container, str): active_container = list(active_container)
1470 for x in active_container: data.extend(" ".join(subprocess.check_output((["sudo"] if IsDockerContainer() else []) + [docker,"port",x]).decode("utf-8").split()).split(x)[-1].split("->"))
1471 for x in data:
1472 port = (x.split(":")[-1] if ":" in x else "").split(" ")[0];
1473 if IsNotEmpty(port): ports.append(int(port))
1474
1475 # Return all active ports as a list of integers
1476 return list(set(ports))
1477
1478@autotest("user","password")
1479def GetDockerEncoding(*args, **kwargs):
1480 """
1481 Creates a base64 encoded string of a given username and password combination used for Docker authentifiction.
1482 @author: garb_ma
1483 @param: username, password
1484 @type: string
1485 """
1486 import base64
1487 # No values have been given. Return immediately.
1488 if not args: return
1489 # Use internal base64 encryption method
1490 message = str(args[0])
1491 # Loop over all given variables and merge them with user-defined delimiter
1492 if len(args) >= 1: message = kwargs.get("delimn",":").join([str(x) for x in args])
1493 # Encode a given string or a list of strings by combining them
1494 message_bytes = message.encode(kwargs.get("encoding",'utf-8'))
1495 base64_bytes = base64.b64encode(message_bytes)
1496 base64_message = base64_bytes.decode(kwargs.get("encoding",'utf-8'))
1497 return base64_message
1498
1499@autotest()
1501 """
1502 Check whether current package lives inside a docker container.
1503 """
1504 path = '/proc/self/cgroup'
1505 return (os.path.exists('/.dockerenv') or os.path.isfile(path) and any('docker' in line for line in open(path))) or GetExecutable("cexecsvc")
1506
1507@autotest("data")
1508def InQuotes(s,quote='"'):
1509 """
1510 Return the given string in quotes.
1511 """
1512 return str(quote)+str(s)+str(quote)
1513
1514@autotest(" ")
1516 """
1517 Check whether a string is empty and/or not given. Returns True otherwise.
1518 """
1519 return bool(s and s.strip())
1520
1521@autotest("3551551677")
1522def IsValidISBN(isbn):
1523 """
1524 Check whether a given string corresponds to a valid ISBN address.
1525 """
1526 import re
1527
1528 # Strip everything unrelated for the search
1529 isbn = isbn.replace("-", "").replace(" ", "").upper();
1530 match = re.search(r'^(\d{9})(\d|X)$', isbn)
1531 if not match: # pragma: no cover
1532 return False
1533
1534 # Strip trailing X from ISBN address.
1535 digits = match.group(1)
1536 check_digit = 10 if match.group(2) == 'X' else int(match.group(2))
1537
1538 result = sum((i + 1) * int(digit) for i, digit in enumerate(digits))
1539
1540 # Returns True is string is a valid ISBN. False otherwise
1541 return (result % 11) == check_digit
1542
1543@autotest()
1544def IsWSL():
1545 """
1546 Detect if the script is running inside WSL or WSL2 on windows.
1547
1548 @note: WSL is thought to be the only common Linux kernel with Microsoft in the name, per Microsoft:
1549 https://github.com/microsoft/WSL/issues/4071#issuecomment-496715404
1550
1551 @author: https://www.scivision.dev/python-detect-wsl/
1552 """
1553 return 'Microsoft' in platform.uname().release
1554
1555def AsDrive(s, sep=os.path.sep):
1556 """
1557 Return s as drive to start an absolute path with path.join(...).
1558 """
1559 if sep != ntpath.sep:
1560 # Linux
1561 drive = posixpath.join(posixpath.sep,s)
1562 else: # pragma: no cover
1563 # Windows
1564 drive = ntpath.join(s+":",ntpath.sep)
1565 return drive
1566
1567def Popen(command, verbosity=1, encoding="utf-8", **kwargs): # pragma: no cover
1568 """
1569 Run command line string "command" in a separate subprocess.
1570 Show output in current console window in dependence of verbosity level:
1571 - 0 --> Quiet
1572 - 1 --> Only show errors
1573 - 2 --> Show every command line output.
1574
1575 @author: garb_ma
1576 @param: command, verbosity
1577 @type: string, integer
1578 """
1579 shell = kwargs.get("shell",not IsDockerContainer() or isinstance(command, six.string_types))
1580 ## Output standard and error messages in dependence of verbosity level.
1581 if shell and not kwargs.get("collect",True):
1582 p = subprocess.check_call(command)
1583 return p
1584 p = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1585 stderr=subprocess.PIPE, shell=shell, env=kwargs.get("env",os.environ.copy()), universal_newlines=kwargs.get("raw",False))
1586 # Define output options.
1587 if kwargs.get("raw",False):
1588 return p
1589 elif kwargs.get("collect",True):
1590 stdout, stderr = p.communicate()
1591 # Output standard and error messages in dependence of verbosity level.
1592 if verbosity >= 2 and IsNotEmpty(stdout):
1593 print((stdout.decode(encoding,errors='ignore').replace(kwargs.get("replace","\n"), '')))
1594 if verbosity >= 1 and IsNotEmpty(stderr):
1595 print((stderr.decode(encoding,errors='ignore').replace(kwargs.get("replace","\n"), '')))
1596 else:
1597 ## Output standard output and parse it directly as is flows in
1598 while True:
1599 stdout = p.stdout.readline()
1600 # Poll process for new output until finished
1601 if not IsNotEmpty(stdout) and p.poll() is not None:
1602 break
1603 if verbosity >= 2 and IsNotEmpty(stdout):
1604 sys.stdout.write(stdout.decode(encoding,errors='ignore').replace(kwargs.get("replace","\n"), ''))
1605 sys.stdout.flush()
1606 # Collect error messages and parse them directly to system error output
1607 _, stderr = p.communicate()
1608 if verbosity >= 1 and IsNotEmpty(stderr):
1609 sys.stderr.write(stderr.decode(encoding,errors='ignore').replace(kwargs.get("replace","\n"), ''))
1610 sys.stderr.flush()
1611 # Return subprocess
1612 return p
1613
1614def SSHPopen(ssh_client, command, verbosity=1, **kwargs): # pragma: no cover
1615 """
1616 Run command line string "command" in a separate SSH client process.
1617 Show output in current console window in dependence of verbosity level:
1618 - 0 --> Quiet
1619 - 1 --> Only show errors
1620 - 2 --> Show every command line output.
1621
1622 @author: garb_ma
1623 @param: command, verbosity
1624 @type: string, integer
1625 """
1626 _, stdout, stderr = ssh_client.exec_command(command, get_pty=kwargs.get("tty",False))
1627
1628 # Return results directly
1629 if not kwargs.get("collect",True) and verbosity >= 1:
1630 for line in iter(lambda: stdout.read(2048).decode('utf-8','ignore'),""):
1631 try:
1632 _ = line; eval('print(line, end="")');
1633 except: pass
1634 return stdout.channel.recv_exit_status()
1635
1636 # Output is collected and parsed at the end
1637 sexit = stdout.channel.recv_exit_status()
1638
1639 sout = stdout.readlines()
1640 serr = stderr.readlines()
1641
1642 if verbosity >= 2 and sout:
1643 print("".join(sout))
1644 if verbosity >= 1 and serr:
1645 print("".join(serr))
1646
1647 return sexit
1648
1649def ConvertExcel(excel_file, output=None, **kwargs): # pragma: no cover
1650 """
1651 Utility function to convert a given *XLSX file to an *XLS.
1652 """
1653 # Get user-defined output folder. Defaults to current working directory
1654 outdir = output
1655 if not outdir: outdir = os.getcwd()
1656 excel = os.path.abspath(excel_file)
1657 # Create a temporary Powershell script
1658 ps = ".".join([str(next(tempfile._get_candidate_names())).replace("_",""),"ps1"])
1659 # Check operating system and available options
1660 try:
1661 # Operating system is not supporting the requested operation
1662 if not GetPlatform() in ["windows"] or not GetExecutable("powershell"): raise OSError
1663 except OSError:
1664 # We are in a Docker container. We have options.
1665 if IsDockerContainer() and not GetExecutable("libreoffice") and kwargs.get("silent_install",IsDockerContainer()):
1666 subprocess.check_call(["sudo","apt-get","update"])
1667 subprocess.check_call(["sudo","apt-get","install","-y","libreoffice"])
1668 # At this point - libreoffice has be exist. If not, raise an error
1669 try:
1670 if not GetExecutable("libreoffice"): raise OSError
1671 except: raise NotImplementedError
1672 ## Use libreoffice for conversion on Linux
1673 # Ensure that the dynamic link path is correct
1674 # Fix for Issue: https://askubuntu.com/questions/1237381/undefined-symbol-error-when-starting-libreoffice-in-ubuntu-20-04
1675 else: command = ["libreoffice","--headless","--convert-to", "xls",PathLeaf(excel)]
1676 # Execute powershell script on Windows
1677 else: command = ["powershell.exe", '.\\'+ps, PathLeaf(excel)]
1678 # Operate fully in a temporary directory
1679 with TemporaryDirectory():
1680 xlsx = PathLeaf(excel)
1681 shutil.copyfile(excel, xlsx)
1682 # Populate the temporary script. Open the input Excel file and convert to legacy version
1683 with open(ps, 'w') as f:
1684 f.write(textwrap.dedent('''\
1685 param ($File)
1686 $myDir = split-path -parent $MyInvocation.MyCommand.Path
1687 $excelFile = "$myDir\\" + $File
1688 $Excel = New-Object -ComObject Excel.Application
1689 $wb = $Excel.Workbooks.Open($excelFile)
1690 $out = "$myDir\\" + (Get-Item ("$myDir\\" + $File) ).Basename + ".xls"
1691 $Excel.DisplayAlerts = $false;
1692 $wb.SaveAs($out, 56)
1693 $Excel.Quit()
1694 '''))
1695 # Execute command
1696 subprocess.check_call(command, env=os.environ.copy()
1697 if not GetPlatform() in ["linux"] else dict(os.environ, LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libfreetype.so.6"))
1698 shutil.copyfile(xlsx.replace(".xlsx",".xls"),os.path.join(outdir,xlsx.replace(".xlsx",".xls")))
1699 # Return the output path of the file created by this function
1700 return os.path.join(outdir,xlsx.replace(".xlsx",".xls"))
1701
1702def CreateArchive(output, source=None, exclude=[".git",".svn","__pycache__"], **kwargs):
1703 """
1704 Create an archive from the given source directory. Defaults to the current project.
1705
1706 @param: output - Absolute output path
1707 @param: source - Source path. Defaults to the current project.
1708 @param: exclude - Files and directories to be ignored
1709 """
1710 # Additional internal filter functions
1711 def tar_exclude(tarinfo):
1712 """
1713 Exclude pattern for tar
1714 """
1715 local = set(tarinfo.name.split(posixpath.sep))
1716 if local.intersection(set(exclude)): return None
1717 return tarinfo
1718
1719 # Defaults to this project
1720 from_source = GetPyXMakePath()
1721 if source: from_source = source
1722
1723 # Get absolute paths
1724 output_filename=os.path.abspath(output);
1725 source_dir=os.path.abspath(from_source);
1726
1727 # Archive operation
1728 if PathLeaf(output_filename).split(".")[1].lower() in ["zip"]:
1729 archive = zipfile.ZipFile(output_filename,"w", zipfile.ZIP_DEFLATED)
1730 rootlen = len(source_dir) + 1
1731 # Iterate through the given folder. Skip predefined folders
1732 for dirpath, _, filenames in os.walk(source_dir):
1733 for filename in filenames:
1734 # Write the file named filename to the archive,
1735 filepath = os.path.join(dirpath, filename)
1736 parentpath = os.path.relpath(filepath, source_dir)
1737 # Do not include .git or .svn folders!
1738 if parentpath.startswith((".")) or any(PathLeaf(filepath) == x for x in exclude): continue
1739 if any(x in filepath.replace(PathLeaf(filepath),"") for x in exclude): continue
1740 # Add path to archive
1741 archive.write(filepath, filepath[rootlen:])
1742 else:
1743 # Allow modification of compression level
1744 compression = PathLeaf(output_filename).split(".")[-1]
1745 archive = tarfile.open(output_filename, "w" + "%s" % (":"+compression if compression in ["gz","bz2","xz"] else ""));
1746 archive.add(source_dir, arcname=os.path.basename(source_dir), filter=tar_exclude);
1747 archive.close()
1748
1749 # Does the output file exists
1750 return os.path.exists(output)
1751
1752def GetResizedImage(image, size, **kwargs):
1753 """
1754 Resize a given image (absolute path) and returns a resized image object.
1755
1756 @param: source - Source path. Must be an absolute path to the icon
1757 @param: size - Single integer value representing the new icon size
1758 """
1759 # Local imports
1760 from PIL import Image, ImageOps, ImageChops
1761 # Utility function definitions
1762 def hasPadding(im):
1763 """
1764 Utility function to check for the existence of an image border
1765 """
1766 bg = Image.new(im.mode, im.size, im.getpixel((0,0)))
1767 diff = ImageChops.difference(im, bg)
1768 diff = ImageChops.add(diff, diff, 2.0, -100)
1769 bbox = diff.getbbox()
1770 return all((bbox[0], bbox[1], (bbox[0] + bbox[2]) <= im.size[0], (bbox[1] + bbox[3]) <= im.size[1]))
1771 # Fetch user input
1772 base_image = copy.deepcopy(image); base_width = size
1773 base_offset = int(max(2,kwargs.get("base_offset",20)))
1774 # Open the image and check for a predefined boarder
1775 icon = Image.open(base_image);
1776 # Only add padding when there is now initial border
1777 base_padding = not hasPadding(icon) and base_offset >= 0
1778 # If now boarder is found
1779 if base_padding: base_width -= base_offset
1780 # Resize the image to its optimal proportions
1781 wpercent = (base_width / float(icon.size[0]))
1782 hsize = int((float(icon.size[1]) * float(wpercent)))
1783 # Resample the image. Try best algorithm first, but define also a fallback solution
1784 try: icon = icon.resize((base_width, hsize), Image.ANTIALIAS)
1785 except: icon = icon.resize((base_width, hsize), Image.Resampling.LANCZOS)
1786 # Add an additional boarder
1787 if base_padding: icon = ImageOps.expand(icon, border=(int(base_offset//2),)*4, fill=icon.getpixel((0,0)) )
1788 # Return an image object
1789 return icon
1790
1791def ConcatenateFiles(filename, files, source=os.getcwd(), ending=''):
1792 """
1793 Concatenate all files into one.
1794 """
1795 FileRead = 0
1796
1797 # Concatenate all files into one temporary file
1798 with open(filename,'wb') as wfd:
1799 for f in [os.path.join(source,cs) if IsNotEmpty(str(os.path.splitext(cs)[1]))
1800 else os.path.join(source,cs+ending) for cs in files]:
1801 # Only attempt to open a file if it exits
1802 if not os.path.exists(f): continue
1803 # All files at this point exist
1804 if FileRead > 0:
1805 # Add a empty line between two source file includes
1806 wfd.write("\n\n".encode())
1807 with open(f,'rb') as fd:
1808 shutil.copyfileobj(fd, wfd, 1024*1024*10)
1809 FileRead += 1
1810
1811def ReplaceTextinFile(filename, outname, replace, inend='', outend='', source=os.getcwd(), **kwargs):
1812 """
1813 Replace all occurrences of replace in filename.
1814 """
1815 Inputfile = os.path.join(source,filename+inend)
1816 Outputfile = outname+outend
1817 try:
1818 # Modern convention
1819 with io.open(Inputfile, errors=kwargs.get("error_handling","replace"), encoding="utf-8") as infile, io.open(Outputfile, 'w', encoding="utf-8") as outfile:
1820 for line in infile:
1821 for src, target in replace.items():
1822 line = line.replace(src, target)
1823 outfile.write(line)
1824 except:
1825 # Legacy version
1826 with open(Inputfile) as infile, open(Outputfile, 'w') as outfile:
1827 for line in infile:
1828 for src, target in replace.items():
1829 line = line.replace(src, target)
1830 outfile.write(line)
1831
1832@autotest([site.getuserbase(),site.getusersitepackages()]*2, site.getuserbase())
1833def RemoveDuplicates(ListofStrings,Duplicate=""):
1834 """
1835 Remove all duplicates in a list of strings
1836 """
1837 count = 0; result = []
1838
1839 # Loop over all elements in the flattened list of strings
1840 for ele in list(ArbitraryFlattening(ListofStrings)):
1841 var = " ".join(ele.split())
1842 # Collect all duplicates
1843 if var == str(Duplicate):
1844 count += 1
1845 continue
1846 else:
1847 if count != 0:
1848 ## Previous entry was a duplicate. Add one element finally
1849 # and continue as usual
1850 result.append(str(Duplicate))
1851 count = 0
1852 result.append(var.strip())
1853
1854 # Return results
1855 return result
1856
1857def DeleteFilesbyEnding(identifier): # pragma: no cover
1858 """
1859 Delete all files from workspace
1860
1861 @author: Marc Garbade, 26.02.2018
1862
1863 @param: identifier: A tuple specifying the files to remove.
1864 @type: Tuple
1865 """
1866 for f in os.listdir(os.getcwd()):
1867 if f.endswith(identifier):
1868 os.remove(f)
1869
1870def DeleteRedundantFolders(Identifier, Except=[], ignore_readonly=False): # pragma: no cover
1871 """
1872 Delete all redundant folders from the current workspace
1873 """
1874 def _IgnoreReadOnly(action, name, exc):
1875 """
1876 Delete read-only folders as well
1877 """
1878 os.chmod(name, stat.S_IWRITE)
1879 os.remove(name)
1880
1881 # Search for this pattern
1882 pattern = os.path.join(os.getcwd(), Identifier)
1883
1884 # Loop over all items in the current working directory
1885 for item in glob.glob(pattern):
1886 if not os.path.isdir(item): continue
1887 directory = r'\\?\ '.strip()+item
1888 if ignore_readonly:
1889 # Delete read-only folders as well
1890 shutil.rmtree(directory, onerror=_IgnoreReadOnly)
1891 continue
1892 # All other cases
1893 shutil.rmtree(directory)
1894
1895def AddFunctionToObject(_func, _obj): # pragma: no cover
1896 """
1897 Bind a function to an existing object.
1898 """
1899 return MethodType(_func, _obj)
1900
1901@autotest(type('Class', (object,), {"data": np.array([0, 1, 2, 3])}))
1903 """
1904 Prepare a object for pickling and convert all numpy
1905 arrays to python defaults (2to3 compatible).
1906 """
1907 _dictobj = _obj.__dict__.copy()
1908 _dictobj['__np_obj_path'] = []
1909 for path, value in ObjectWalk(_dictobj):
1910 if isinstance(value, (np.ndarray, np.generic)):
1911 parent = _dictobj
1912 for step in path[:-1]: # pragma: no cover
1913 parent = parent[step]
1914 parent[path[-1]] = value.tolist()
1915 _dictobj['__np_obj_path'].append(path[-1])
1916 return _dictobj
1917
1918@autotest({"data": [[0, 1],[2, 3]], "__np_obj_path": ["data"]})
1920 """
1921 Restore the original dictionary by converting python defaults to their
1922 numpy equivalents if required (2to3 compatible).
1923 """
1924 _dictobj = _dict.copy()
1925 # Recreate serialized arrays accordingly
1926 for key, value in _dictobj.items():
1927 if key in _dictobj.get('__np_obj_path',[]):
1928 _dictobj[key] = np.float64(value)
1929 if len(np.shape(value)) == 2:
1930 _dictobj[key] = np.asmatrix(value)
1931 # Return result
1932 return _dictobj
1933
1934def PathLeaf(path):
1935 """
1936 Return the last item of an arbitrary path (its leaf).
1937 """
1938 head, tail = ntpath.split(path)
1939 return tail or ntpath.basename(head)
1940
1941def ArbitraryFlattening(container):
1942 """
1943 Restore the original dictionary by converting Python defaults to their
1944 numpy equivalents if required (2to3 compatible).
1945 """
1946 for i in container:
1947 if isinstance(i, (list,tuple, np.ndarray)):
1948 for j in ArbitraryFlattening(i):
1949 yield j
1950 else:
1951 yield i
1952
1953@autotest('{"data": True}')
1954def ArbitraryEval(expression):
1955 """
1956 Evaluate a given expression using ast.literal_eval while treating every string as raw.
1957 """
1958 node_or_string = expression
1959 if isinstance(expression,str): node_or_string = r"%s" % expression if GetPlatform() in ["windows"] else r"'%s'" % expression
1960 return ast.literal_eval(node_or_string)
1961
1962def MoveIO(src, dest, verbose=0): # pragma: no cover
1963 """
1964 Move given src to defined dest while waiting for completion of the process.
1965 """
1966 # Copy archive to destination folder. Wait for completion
1967 if GetPlatform() == "windows":
1968 copy = "move"
1969 else:
1970 copy = "mv"
1971 # Copy data to result directory, wait for completion
1972 p = Popen(" ".join([copy,src,dest]), verbosity=verbose)
1973 return p
1974
1975@autotest("Make.py")
1976def FileWalk(source, path=os.getcwd()):
1977 """
1978 Walk recursively through path. Check if all files listed in source are present.
1979 If True, return them. If False, return all files present in the given path.
1980 """
1981 # Initialize all variables
1982 files = list([]); request = source;
1983 # Support both list and string input
1984 if not isinstance(request, (list,tuple,)): request = [request]
1985 # These are all uploaded files
1986 InputFiles = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
1987 try: # pragma: no cover
1988 # Check if source code files have been specified. Otherwise, use everything from the ZIP file
1989 if all(np.array([len(request),len(InputFiles)]) >= 1):
1990 # Ignore file extension here. This will interfere with the checks later on.
1991 if set([os.path.splitext(x)[0] for x in request]).issubset(set([os.path.splitext(f)[0] for f in InputFiles])):
1992 files.extend(request)
1993 else:
1994 files.extend(InputFiles)
1995 except TypeError:
1996 files.extend(InputFiles)
1997 pass
1998
1999 return files
2000
2001@autotest(object, AbstractBase)
2002def ClassWalk(reference, cls):
2003 """
2004 Recursively find a class object by name and return its object from another class.
2005
2006 @note: Returns None if reference cannot be resolved. Can be safely used with AbstractBase classes and reload.
2007 """
2008 result = None
2009 for x in getattr(cls, "__bases__"):
2010 if x.__name__ in [reference.__name__]:
2011 result = copy.deepcopy(x)
2012 break
2013 else: result = ClassWalk(reference,x)
2014 return result
2015
2016@autotest(site.getusersitepackages(), contains="DLR")
2017def PathWalk(path, startswith=None, endswith=None, contains=None, **kwargs):
2018 """
2019 Walk recursively through path. Exclude both folders and files if requested.
2020 """
2021 def Skip(x, starts, contains, ends):
2022 """
2023 Evaluate skip condition
2024 """
2025 if isinstance(starts, (six.string_types, tuple, list)):
2026 if x.startswith(starts if not isinstance(starts,list) else tuple(starts)):
2027 return True
2028 if isinstance(contains, (six.string_types, tuple, list)):
2029 tmp = list([]); tmp.append(contains)
2030 tmp = tuple(ArbitraryFlattening(tmp))
2031 if any(s in x for s in tmp):
2032 return True
2033 if isinstance(ends, (six.string_types, tuple, list)):
2034 if x.endswith(ends if not isinstance(ends,list) else tuple(ends)):
2035 return True
2036 return False
2037
2038 for root, dirs, files in os.walk(os.path.normpath(path)):
2039 if kwargs.get("exclude",any([startswith, endswith, contains])):
2040 files = [f for f in files if not Skip(f,startswith,contains,endswith)]
2041 dirs[:] = [d for d in dirs if not Skip(d,startswith,contains,endswith)]
2042 yield root, dirs, files
2043
2044@autotest(AbstractMethod)
2045def ObjectWalk(obj, path=(), memo=None):
2046 """
2047 Walk recursively through nested python objects.
2048
2049 @author: Yaniv Aknin, 13.12.2011
2050
2051 @param: obj, path, memo
2052 @type: object, list, boolean
2053 """
2054 string_types = six.string_types
2055 iteritems = lambda mapping: getattr(mapping, 'iteritems', mapping.items)()
2056 if memo is None:
2057 memo = set()
2058 iterator = None
2059 if isinstance(obj, dict):
2060 iterator = iteritems
2061 elif isinstance(obj, (list, set, tuple)) and not isinstance(obj, string_types):
2062 iterator = enumerate
2063 if iterator:
2064 if id(obj) not in memo:
2065 memo.add(id(obj))
2066 for path_component, value in iterator(obj):
2067 for result in ObjectWalk(value, path + (path_component,), memo):
2068 yield result
2069 memo.remove(id(obj))
2070 else:
2071 yield path, obj
2072
2073@autotest(sys.executable, terminate=False)
2074def ProcessWalk(executable, terminate=True):
2075 """
2076 Walk through all active processes on the host machine,
2077
2078 @author: garb_ma
2079 """
2080 import psutil
2081
2082 # Iterate through all active processes
2083 for proc in psutil.process_iter():
2084 if os.getpid() == proc.pid:
2085 continue # Skip self.
2086 try:
2087 for key, value in proc.as_dict().items():
2088 if executable in str(key) or executable in str(value):
2089 proc_delimn = " "; processName = proc.name(); processID = proc.pid;
2090 if executable in str(processName): # pragma: no cover
2091 if terminate: proc.kill()
2092 print("==================================")
2093 print("Found SSH process @ " + str(proc_delimn.join([str(processName),':::',str(processID)])))
2094 if terminate: print("The process is terminated.")
2095 print('==================================')
2096 except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
2097 pass
2098
2099## Adding compatibility alias for logging function
2100setattr(sys.modules[__name__],"setLogger", getattr(sys.modules[__name__],"GetLogger"))
2101setattr(sys.modules[__name__],"getLogger", getattr(sys.modules[__name__],"GetLogger"))
2102
2103if __name__ == '__main__':
2104 pass
Abstract meta class for all data class objects.
__init__(self, *args, **kwargs)
Definition Utility.py:166
classify(cls, *args, **kwargs)
Definition Utility.py:205
__new__(cls, *args, **kwargs)
Definition Utility.py:173
Inherited from built-in object.
__new__(cls, name, package=None, **kwargs)
Definition Utility.py:121
__init__(self, *args, **kwargs)
Definition Utility.py:115
Class to create 2to3 compatible pickling dictionary.
__get__(self, obj=None, objtype=None)
Definition Utility.py:278
Class to create 2to3 compatible pickling dictionary.
Class to create 2to3 compatible pickling dictionary.
Data
Dictionary for further processing.
Definition Utility.py:332
Class to create 2to3 compatible pickling dictionary.
Module containing all relevant modules and scripts associated with the building process.
Definition __init__.py:1
ConcatenateFiles(filename, files, source=os.getcwd(), ending='')
Definition Utility.py:1791
GetIterableAsList(Iterable)
Definition Utility.py:1092
AddFunctionToObject(_func, _obj)
Definition Utility.py:1895
FileOutput(FileName)
Definition Utility.py:539
DeleteRedundantFolders(Identifier, Except=[], ignore_readonly=False)
Definition Utility.py:1870
TemporaryEnvironment(environ={})
Definition Utility.py:459
GetOpenAPIGenerator(output=None, **kwargs)
Definition Utility.py:949
PathWalk(path, startswith=None, endswith=None, contains=None, **kwargs)
Definition Utility.py:2017
GetExecutable(name, get_path=False, **kwargs)
Definition Utility.py:660
AsDrive(s, sep=os.path.sep)
Definition Utility.py:1555
MoveIO(src, dest, verbose=0)
Definition Utility.py:1962
GetDockerContainer(container, encoding="utf-8")
Definition Utility.py:1192
CreateArchive(output, source=None, exclude=[".git",".svn","__pycache__"], **kwargs)
Definition Utility.py:1702
GetPathConversion(path, target_os=None, **kwargs)
Definition Utility.py:1057
InQuotes(s, quote='"')
Definition Utility.py:1508
GetSanitizedDataFromCommand(*args, **kwargs)
Definition Utility.py:859
RecoverDictionaryfromPickling(_dict)
Definition Utility.py:1919
PrepareObjectforPickling(_obj)
Definition Utility.py:1902
GetDockerRunner(runner="gitlab-runner", restart_policy="always", image="", token="", executor="shell", url='https://gitlab.dlr.de/', tags="docker,linux", ssl_verify=True, **kwargs)
Definition Utility.py:1296
GetDockerPorts(**kwargs)
Definition Utility.py:1446
DeleteFilesbyEnding(identifier)
Definition Utility.py:1857
Popen(command, verbosity=1, encoding="utf-8", **kwargs)
Definition Utility.py:1567
ConsoleRedirect(to=os.devnull, stdout=None)
Definition Utility.py:471
GetBibliographyData(fulltext, verbose=0)
Definition Utility.py:992
GetDockerRegistry(registry, port, https_keypath="", https_pemfile="", GUI=False, **kwargs)
Definition Utility.py:1214
GetRequirements(dirname, args=[], check=False, **kwargs)
Definition Utility.py:763
ConvertExcel(excel_file, output=None, **kwargs)
Definition Utility.py:1649
FileUpload(url, filename, header={}, **kwargs)
Definition Utility.py:558
GetLogger(name=None, **kwargs)
Definition Utility.py:712
ObjectWalk(obj, path=(), memo=None)
Definition Utility.py:2045
GetResizedImage(image, size, **kwargs)
Definition Utility.py:1752
SSHPopen(ssh_client, command, verbosity=1, **kwargs)
Definition Utility.py:1614
RemoveDuplicates(ListofStrings, Duplicate="")
Definition Utility.py:1833
ClassWalk(reference, cls)
Definition Utility.py:2002
ProcessWalk(executable, terminate=True)
Definition Utility.py:2074
FileWalk(source, path=os.getcwd())
Definition Utility.py:1976
GetEnvironmentFromCommand(command)
Definition Utility.py:887
GetTemporaryFileName(arg=None, filename="Temp", extension=".cpd", **kwargs)
Definition Utility.py:1039
ArbitraryFlattening(container)
Definition Utility.py:1941
ReplaceTextinFile(filename, outname, replace, inend='', outend='', source=os.getcwd(), **kwargs)
Definition Utility.py:1811
TemporaryDirectory(default=None)
Definition Utility.py:430
GetMergedRepositories(RepoID, RepoList, RepoBranch={}, MergeBranch=[], scratch=os.getcwd(), **kwargs)
Definition Utility.py:1101
GetDockerUI(name="portainer", image="portainer/portainer-ce:latest")
Definition Utility.py:1263
GetDockerEncoding(*args, **kwargs)
Definition Utility.py:1479
ArbitraryEval(expression)
Definition Utility.py:1954