6Classes and functions defined for convenience.
12----------------------------------------------------------------------------------------------
19@author: garb_ma [DLR-FA,STM Braunschweig]
20----------------------------------------------------------------------------------------------
34 from future
import standard_library
36 standard_library.install_aliases()
41 from builtins
import str
42 from builtins
import object
49 FileNotFoundError = IOError
72import posixpath, ntpath
87 from contextlib
import contextmanager
89 from contextlib2
import contextmanager
91from types
import MethodType
92from ..
import PyXMakePath
96 autotest = Make.Coverage.add
98 def autotest(*args, **kwargs):
99 def decorator(func):
return func
103logger = logging.getLogger(__name__)
110 Abstract (lazy) import class to construct a module with attribute which is only really loaded into memory when first accessed.
111 It defaults to lazy import behavior.
113 @note: Derived from https://stackoverflow.com/questions/77319516/lazy-import-from-a-module-a-k-a-lazy-evaluation-of-variable
117 Low-level initialization of parent class.
121 def __new__(cls, name, package=None, **kwargs):
123 An approximate implementation of import.
127 absolute_name = importlib.util.resolve_name(name, package)
129 try:
return sys.modules[absolute_name]
130 except KeyError:
pass
133 if '.' in absolute_name:
134 parent_name, _, child_name = absolute_name.rpartition(
'.')
135 parent_module = importlib.import_module(parent_name)
136 path = parent_module.__spec__.submodule_search_locations
137 for finder
in sys.meta_path:
138 spec = finder.find_spec(absolute_name, path)
142 msg =
'No module named %s' % absolute_name
143 raise ModuleNotFoundError(msg, name=absolute_name)
145 if kwargs.get(
"lazy_import",
True):
146 loader = importlib.util.LazyLoader(spec.loader)
149 module = importlib.util.module_from_spec(spec)
150 sys.modules[absolute_name] = module
151 spec.loader.exec_module(module)
154 setattr(parent_module, child_name, module)
160@six.add_metaclass(abc.ABCMeta)
163 Parent class for all abstract base classes.
168 Low-level initialization of parent class.
175 Check if the current base is an abstract base.
177 if cls.__bases__[-1].__name__
in [AbstractBase.__name__]:
raise TypeError(
"Can't instantiate abstract base class %s." % cls.
__name__)
178 try:
return super(AbstractBase,cls).
__new__(cls)
181 except TypeError:
return super(
ClassWalk(AbstractBase,cls),cls).
__new__(cls)
186 Recover a derived data class completely from its JSON or dictionary form.
190 Subclass instance for initialization
194 Initialization of any class instance.
196 for k,v
in _dictionary.items(): setattr(self, k, v)
202 return type(cls.
__name__, (Base, cls), {})(dictionary)
207 Serializes an arbitrary data class instantiation call. Returns the complete class as JSON.
209 realization = cls(*args)
211 for x, y
in kwargs.items(): getattr(realization, x)(*y
if isinstance(y,list)
else y)
212 if hasattr(realization,
"create"): getattr(realization,
"create")()
213 return realization.jsonify()
217 Create a JSON representation of the current class
223 Update any given class attribute.
225 for k,v
in kwargs.items(): setattr(self, k, v)
230 Returns a string representation of the current instance.
232 return str(
"%s.%s(%s)") % (type(self).__name__, self.
recover.__name__, str(self))
236 Prepare an object for JSON (2to3 compatible). Returns a canonical data representation of the current instance.
238 return json.dumps(self, default=PrepareObjectforPickling)
242 Prepare the object for pickling (2to3 compatible)
249 Recover a dictionary from pickling (2to3 compatible)
252 self.__dict__.
update(_dictobj)
258 Recursively find the common ancestor in all bases for a given class and compare them with the supplied base.
260 @note: Returns None if no common ancestor can be found
268 Abstract method to construct an instance and class method with the same descriptor.
270 @note: Derived from https://stackoverflow.com/questions/2589690/creating-a-method-that-is-simultaneously-an-instance-and-class-method
274 Construct an instance method.
280 Custom descriptor for this class. Returns method either as class or as an instance.
282 @functools.wraps(self.method)
283 def _wrapper(*args, **kwargs):
285 A wrapper calling the given method
287 if obj
is not None:
return self.
method(obj, *args, **kwargs)
288 else:
return self.
method(objtype, *args, **kwargs)
295 Context manager for temporarily changing the current working directory.
297 @author: Brian M. Hunt
299 def __init__(self, newPath):
300 self.
newPath = os.path.expanduser(newPath)
302 if not os.path.exists(self.
newPath)
and self.
newPath != os.getcwd():
303 print(
"==================================")
304 print(
"Creating a new scratch folder @: %s" % self.
newPath)
305 print(
"This folder will not be deleted once the job is done!")
306 print(
"==================================")
313 def __exit__(self, etype, value, traceback):
320 Class to convert an arbitrary pickle file (2.x & 3.x) into a readable
325 Get a dictionary from a *.cpd file.
327 @param: self, FileName
329 @type: FileName: string
332 self.
Data = GetDataFromPickle.getDictfromFile(FileName)
338 Open a *.cpd file and extract the dictionary stored within.
341 @type: FileName: string
343 FileIn = open(FileName,
"rb")
344 Dict = cp.load(FileIn)
352 Context manager for update an existing ZIP folder
354 @author: Marc Garbade
357 __binary_empty_archive = b
'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
362 Create a new compatible empty archive
364 @author: Marc Garbade
367 from fastapi
import UploadFile
369 try: archive = UploadFile(
None,filename=filename)
370 except TypeError: archive =UploadFile(filename)
374 def __init__(self, zipname, zipdata=None, outpath=os.getcwd(), exclude=[], update=
True, **kwargs):
376 try: self.
ZipData = copy.deepcopy(zipdata)
379 self.
buffer = open(os.path.join(os.getcwd(), self.
ZipName),
'wb+')
380 shutil.copyfileobj(zipdata, self.
buffer)
383 if not zipdata
or (hasattr(zipdata,
"read")
and zipdata.read() == b
''):
384 self.
ZipData = tempfile.SpooledTemporaryFile()
389 if not os.path.exists(os.path.join(os.getcwd(), self.
ZipName)):
390 self.
buffer = open(os.path.join(os.getcwd(), self.
ZipName),
'wb+')
396 self.
Output = io.BytesIO();
405 with zipfile.ZipFile(str(self.
ZipName))
as Input:
411 self.
ExcludeFiles.extend([f
for f
in os.listdir(os.getcwd())
if os.path.isfile(os.path.join(os.getcwd(), f))])
413 def __exit__(self, etype, value, traceback):
415 with zipfile.ZipFile(self.
Output,
"w", zipfile.ZIP_DEFLATED)
as Patch:
417 for dirpath, _, filenames
in os.walk(cwd):
419 filepath = os.path.join(dirpath,f)
420 arcpath = filepath.split(cwd)[-1]
423 Patch.write(filepath, arcpath)
427 f.write(self.
Output.getvalue())
432 Create a temporary dictionary for use with the "with" statement. Its content is deleted after execution.
435 @type: default: string
438 def Changed(newdir, cleanup=lambda:
True):
440 Local helper function to clean up the directory
442 prevdir = os.getcwd()
443 os.chdir(os.path.expanduser(newdir))
451 dirpath = tempfile.mkdtemp(dir=default)
453 try: shutil.rmtree(dirpath)
455 with Changed(dirpath, cleanup):
461 Temporarily set process environment variables.
463 old_environ = os.environ.copy()
464 os.environ.update(environ)
468 os.environ.update(old_environ)
473 Redirect console output to a given file.
475 def fileno(file_or_fd):
477 Small helper function to check the validity of the dump object.
479 fd = getattr(file_or_fd,
'fileno',
lambda: file_or_fd)()
480 if not isinstance(fd, int):
481 raise ValueError(
"Expected a file (`.fileno()`) or a file descriptor")
486 Also flush c stdio buffers on python 3 (if possible)
490 from ctypes.util
import find_library
495 libc = ctypes.cdll.msvcrt
497 libc = ctypes.cdll.LoadLibrary(find_library(
'c'))
500 libc.fflush(ctypes.c_void_p.in_dll(libc,
'stdout'))
501 except (AttributeError, ValueError, IOError):
511 stdout_fd = fileno(stdout)
514 with os.fdopen(os.dup(stdout_fd), stdout.mode)
as copied:
517 os.dup2(fileno(to), stdout_fd)
519 with open(to,
'wb')
as to_file:
520 os.dup2(to_file.fileno(), stdout_fd)
527 os.dup2(copied.fileno(), stdout_fd)
532 Redirect all console outputs to a given stream
536 yield (inp, err, out)
541 Redirect outputs to a given file.
543 if sys.version_info >= (3,4):
547 with open(FileName,
'w', encoding=
"utf-8")
as f, contextlib.redirect_stdout(f),
MergedConsoleRedirect(sys.stdout):
551 with open(FileName,
'w', encoding=
"utf-8")
as f, contextlib.redirect_stdout(f):
560 Post a given file as a binary string to a given URL.
562 @note: LFS is available if request_toolbelt is installed.
566 try: r = requests.post(url, files = {kwargs.pop(
"kind",
'file'): open(filename,
'rb')}, headers=header, **kwargs)
567 except OverflowError:
570 from requests_toolbelt.multipart
import encoder
571 session = requests.Session()
572 with open(filename,
'rb')
as f:
573 form = encoder.MultipartEncoder({
"documents": (filename, f,
"application/octet-stream"),
"composite":
"NONE"})
574 header.update( {
"Prefer":
"respond-async",
"Content-Type": form.content_type})
575 r = session.post(url, headers=header, data=form, **kwargs)
578 except ImportError: print(
"The given file is too large. Skipping.")
580 except: print(
"Failed to upload %s." % filename)
584@autotest("www.dlr.de")
587 Get all URLs present in a given string
592 regex =
r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
593 url = re.findall(regex,s)
596 if [x[0]
for x
in url]:
597 return [x[0]
for x
in url]
600 return s.split(
"href=")[1]
607 Get the PyXMake path from *__init__.
614 Get the underlying machine platform in lower cases.
616 return str(platform.system()).lower()
620 Get the underlying machine architecture. Returns either x86 or x64 which corresponds to
621 32 or 64 bit systems.
623 if struct.calcsize(
"P") * 8 == 64:
631 Return the link target of a symbolic soft link
633 @note: Supports .lnk files from windows. Returns the target as well as all arguments
641 try:
import pywintypes
643 except ImportError: sys.path.append(os.path.join(site.getsitepackages()[-1],
"pywin32_system32"))
647 import win32com.client
649 shell = win32com.client.Dispatch(
"WScript.Shell")
650 shortcut = shell.CreateShortCut(path)
652 result = delimn.join([shortcut.Targetpath,shortcut.Arguments])
655 if not result: result = os.readlink(path)
662 Check whether name is on PATH and marked as executable.
665 @note: https://stackoverflow.com/questions/11210104/check-if-a-program-exists-from-a-python-script
669 kwargs.update({
"path": kwargs.get(
"path",
670 kwargs.pop(
"search_paths",os.pathsep.join([os.getenv(
"PATH",os.getcwd()),os.getcwd()])
if GetPlatform() !=
"windows" else None))})
672 from whichcraft
import which
674 if sys.version_info >= (3, 3):
675 from shutil
import which
678 from distutils.spawn
import find_executable
682 return find_executable(name, **kwargs)
is not None
685 return find_executable(name, **kwargs)
is not None, find_executable(name, **kwargs)
689 return which(name, **kwargs)
is not None
692 return which(name, **kwargs)
is not None, which(name, **kwargs)
697 Inspect the input variable and return a Boolean value if conversion is possible.
699 @Original: https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse
702 if isinstance(v, bool):
704 if v.lower()
in (
'yes',
'true',
't',
'y',
'1',
"on"):
706 elif v.lower()
in (
'no',
'false',
'f',
'n',
'0',
"off"):
709 raise argparse.ArgumentTypeError(
'Boolean value expected.')
711@autotest(autorun=False, stream=sys.stdout, reconfigure=True)
714 Initialize a root logger if no settings prior to loading have been found.
715 Otherwise, inherit a logger from supported system logging utilities when available.
716 Finally, return a reference to the created or already created logger.
722 log_format = kwargs.pop(
"format",
None)
723 log_stream = kwargs.pop(
"stream",
None)
724 log_level = kwargs.pop(
"level",logging.NOTSET)
725 log_overwrite = kwargs.pop(
"overwrite", log_name !=
None)
728 if log_overwrite: log_level = logging.getLogger().handlers[0].level
729 except IndexError:
pass
732 from fa_pyutils.service.logger
import MyLogger
as getLogger
734 logger = getLogger(log_name)
735 logger._setLogLevel(log_level)
737 if kwargs.pop(
"user",getattr(sys,
"frozen",
False)):
raise ImportError
738 logger.handlers.clear()
739 handle = logging.StreamHandler()
741 log_format = logger.formatter._fmt.split()
742 if log_name: log_format.insert(1,
'%(name)s')
743 log_format = delimn.join(log_format)
745 handle.setFormatter(logging.Formatter(log_format))
748 logger = logging.getLogger(__name__)
749 handle = logging.StreamHandler(log_stream)
752 logger.setLevel(log_level)
753 logger.addHandler(handle)
755 if kwargs.pop(
"reconfigure",len(logging.getLogger().handlers) < 1
and log_overwrite):
756 logging.basicConfig(level=log_level, format=log_format, stream=log_stream, **kwargs)
758 if log_name
and not log_name
in logging.root.manager.loggerDict:
759 logging.root.manager.loggerDict.update({log_name:logger})
761 return logging.getLogger(log_name)
765 Create a list of required sub-packages for a given Python project (given as full directory path).
773 from packaging
import version
774 from tempfile
import NamedTemporaryFile
775 from pipreqs.pipreqs
import main
as listreqs
777 tmp = NamedTemporaryFile(mode=
"r+", suffix=
".txt", encoding=
"utf-8", delete=
False).name
778 command = [
"--force",
"--savepath", tmp,
"--encoding",
"utf-8"]
780 command.extend(args); command.append(dirname)
782 if version.parse(pipreqs.__version__) >= version.parse(
"0.4.11")
and "--no-pin" in command:
783 command.remove(
"--no-pin"); command = [
"--mode",
"no-pin"] + command ;
785 restored = copy.deepcopy(sys.argv); sys.argv = sys.argv[0:1] + command; listreqs(); sys.argv = restored
787 target_url =
'file:' + urllib.request.pathname2url(tmp)
789 data = urllib.request.urlopen(target_url).read().decode(
'utf-8').replace(os.linesep,
" ").split()
793 from stdlib_list
import stdlib_list
795 data = [str(item)
for item
in data
if not any([x.startswith(str(item))
for x
in stdlib_list()])]
796 except ImportError:
pass
798 responses = [(item, requests.get(
"https://pypi.python.org/pypi/%s/json" % str(item).split(
"==")[0]))
for item
in data]
799 try: data = [str(item)
for item, response
in responses
if str(item).split(
"==")[1]
in response.json()[
"releases"] ]
800 except: data = [str(item)
for item, response
in responses
if response.status_code == 200]
804 response = dict(requests.get(
"https://pypi.python.org/pypi/%s/json" % str(item).split(
"==")[0]).json())
805 try: diff = abs(int(datetime.date.today().year) - int(response[
"releases"][response[
"info"][
"version"]][0][
"upload_time_iso_8601"].split(
"-")[0]))
809 data.remove(item);
continue
810 if diff >= int(kwargs.get(
"check_updated", 5)): data.remove(item)
814 except FileNotFoundError:
822 Return the host IP address (IP address of the machine executing this code)
824 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
827 s.connect((
'10.255.255.255', 1))
828 IP = s.getsockname()[0]
838 Return the WSL IP address (IP address of the machine executing this code)
842 Heuristic to detect if Windows Subsystem for Linux is available.
844 @source: https://www.scivision.dev/python-detect-wsl/
847 wsl = shutil.which(
"wsl")
850 ret = subprocess.run([
"wsl",
"test",
"-f",
"/etc/os-release"])
851 return ret.returncode == 0
855 if hasWSL():
return subprocess.check_output([
"bash",
"-c",
"ifconfig eth0 | grep 'inet '"]).decode().split(
"inet ")[-1].split(
" netmask")[0]
861 Returns platform independent paths extracted from a given command or list
863 def sanitize(expression, **kwargs):
865 Validate a given pair
870 except kwargs.get(
"allowed_exceptions", (ValueError, SyntaxError))
as _:
876 data = [os.path.normpath(sanitize(x,**kwargs))
for x
in data]
878 if kwargs.get(
"is_path",
True):
880 result = [os.path.abspath(x)
if os.path.exists(os.path.abspath(x))
881 else os.path.abspath(os.path.join(os.getcwd(),x))
for x
in data]
889 Returns the active environment from a process after a given command is executed.
892 def validate_pair(ob):
894 Validate a given pair
897 if not (len(ob) == 2):
898 raise RuntimeError(
"Unexpected result: %s" % ob)
902 def consume(iterable):
904 Iterate over a given stream of inputs
907 while True: next(iterable)
908 except StopIteration:
pass
915 delimn =
" %s " %
"&&" if GetPlatform()
in [
"windows"]
else ";"
916 printenv =
"set" if GetPlatform()
in [
"windows"]
else "printenv"
917 tag =
'Done running command'
919 command = delimn.join([command,
'echo "%s"' % tag,printenv])
921 proc =
Popen(command, raw=
True)
925 consume(itertools.takewhile(
lambda l: tag
not in l, lines))
927 handle_line =
lambda l: l.rstrip().split(
'=',1)
929 pairs = map(handle_line, lines)
930 valid_pairs = filter(validate_pair, pairs)
932 result = dict(valid_pairs)
941 Return the local DNS IP address.
944 dns_resolver = dns.resolver.Resolver()
946 return dns_resolver.nameservers
948@autotest(default = True)
951 Get the latest version of OpenAPI generator (requires Internet connection).
957 import urllib.request
959 from lxml
import html
960 from packaging.version
import Version, parse
965 if not output: path = os.path.join(PyXMakePath,
"Build",
"bin",
"openapi")
968 if not os.path.exists(path)
or not os.listdir(path)
and kwargs.get(
"silent_install",
True):
970 base =
"https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli"
973 page = requests.get(base)
974 page.raise_for_status()
975 webpage = html.fromstring(page.content)
977 version = sorted([x[:-1]
for x
in webpage.xpath(
'//a/@href')
if str(x[:-1][0]).isdigit()
and isinstance(parse(x[:-1]),Version)], reverse=
True)[0]
979 url = posixpath.join(base,version,
"openapi-generator-cli-%s.jar" % kwargs.get(
"version", version ) )
981 os.makedirs(path, exist_ok=
True)
983 urllib.request.urlretrieve(url,os.path.join(path,
PathLeaf(url)))
985 except:
raise ConnectionError(
"Downloading latest OpenAPI generator client failed.")
987 if os.listdir(path): result = os.path.join(path,sorted([x
for x
in os.listdir(path)], reverse=
True)[0])
994 Perform a full text search using Googles' book API.
1000 @note: Uses requests when available; falls back to urllib if requests is not found.
1002 try:
import requests
1003 except ImportError:
import urllib.request
as requests
1008 with requests.urlopen(base_api_link)
as f: text = f.read()
1009 result = text.decode(
"utf-8")
1011 except AttributeError: result = requests.get(base_api_link).text
1016 google_api_search = posixpath.join(
"https:",
"",
" ",
"www.googleapis.com",
"books",
"v1",
"volumes")
1017 base_api_link = posixpath.join(google_api_search,
"?q=").replace(
" ",
"")+urllib.parse.quote(str(fulltext))
1020 decoded_text = get(base_api_link)
1023 obj = json.loads(decoded_text)
1025 base_api_link = posixpath.join(google_api_search,obj[
"items"][0][
"id"]).replace(
" ",
"")
1028 decoded_text = get(base_api_link)
1029 JSON = json.loads(decoded_text)
1031 if verbose >= 1: print(
"No matching entry found")
1041 Create a temporary file name with extension *.cpd by default. Optional argument: Seed for random number generation.
1043 if isinstance(arg, six.integer_types):
1046 randTempInteger = rd.randint(1,1000)
1048 TempFileName = filename + str(randTempInteger) + kwargs.get(
"ending",extension)
1051 randTempInteger = rd.randint(1,1000)
1053 TempFileName = filename + str(randTempInteger) + kwargs.get(
"ending",extension)
1056@autotest("'%s'" % site.getusersitepackages())
1059 Return the given absolute path in its Linux/Windows counter part.
1061 current_os = target_os
1064 if path[0] == path[-1]
and path[0]
in [
"'",
'"']:
1069 if current_os.lower() ==
"linux":
1070 target = posixpath; pristine = ntpath
1072 target = ntpath; pristine = posixpath
1074 if os.path.splitdrive(path)[0]:
1076 converted_path = target.join(
AsDrive(os.path.splitdrive(path)[0][0], sep=target.sep), *os.path.splitdrive(path)[-1].replace(pristine.sep, target.sep).split(target.sep)[1:])
1078 converted_path = ntpath.abspath(path)
1080 if len(path.split(pristine.sep)) ==1: pristine = target
1081 if kwargs.get(
"use_linux_drive",
True): converted_path = target.join(
AsDrive(path.split(pristine.sep)[1:][0], sep=target.sep),*path.split(pristine.sep)[2:])
1084 converted_path = path
1087 path = quote + path + quote
1088 converted_path = quote + converted_path + quote
1089 return converted_path
1094 Walk through an iterable input set and store the results in a list.
1097 for i
in range(0,len(Iterable)):
1098 AsList.append(Iterable[i])
1103 Walk through an iterable input set and store the results in a list.
1107 import git_filter_repo
1109 raise NotImplementedError
1114 repo_branch = RepoBranch
1115 if not MergeBranch: merge_branch = [
"master" for _
in RepoList]
1118 if len(merge_branch) != len(RepoList):
1119 raise NotImplementedError
1122 merge_branch = iter(merge_branch)
1126 if not os.path.exists(RepoID):
1128 g = git.Repo.init(RepoID)
1131 g = git.Repo(RepoID)
1133 for repo
in RepoList:
1134 add_repo = repo[0]; add_package = repo[1]
1136 try: add_subfolder = repo[2];
1137 except: add_subfolder =
""
1140 add_branch = next(merge_branch)
1143 _ = git.Repo.clone_from(add_repo, add_package)
1146 command =
" ".join([sys.executable,git_filter_repo.__file__,
"--to-subdirectory-filter",add_subfolder])
1147 _.git.execute(shlex.split(command,posix=
not os.name.lower()
in [
"nt"]))
1150 g.git.remote(
"add",add_package,
"../"+add_package)
1151 g.git.fetch(add_package,
"--tags")
1152 g.git.merge(
"--allow-unrelated-histories",url_delimn.join([add_package,add_branch]))
1153 g.git.remote(
"remove",add_package)
1162 for repo
in RepoList:
1164 if not repo_branch:
break
1165 try: add_subfolder = repo[2];
1166 except: add_subfolder =
""
1168 g.git.switch(master)
1171 g.git.checkout(master, b=repo_branch[add_subfolder])
1172 command =
" ".join([sys.executable,git_filter_repo.__file__,
"--force",
"--subdirectory-filter",add_subfolder+
"/",
"--refs",repo_branch[add_subfolder]])
1173 g.git.execute(shlex.split(command,posix=
not os.name.lower()
in [
"nt"]))
1175 g.git.switch(master)
1177 if not kwargs.get(
"keep_subfolder",
True):
1178 for repo
in RepoList:
1180 if not repo_branch:
break
1181 try: add_subfolder = repo[2];
1183 DeleteRedundantFolders(os.path.join(os.path.abspath(os.getcwd()),RepoID,add_subfolder), ignore_readonly=
True)
1186 g.git.commit(
"-m",
"Added branches")
1194 Check if a current given container is active and running. Returns boolean value.
1202 print(
"==================================")
1203 print(
"docker-compose not found. Please install Docker")
1204 print(
"==================================")
1207 command =
" ".join([
"docker",
"ps",
"--filter",
"name="+container])
1208 p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=
not IsDockerContainer())
1209 logs, _ = p.communicate()
1212 return not logs.decode(encoding,errors=
'ignore').replace(
'\n',
'').endswith(
"NAMES")
1216 Check if a current given registry is active and running. If not, start a local docker registry at the given port and name.
1217 Optionally, secure connection by a HTTPS certificate (PEM) and alter the base image
1225 print(
"==================================")
1226 print(
"Docker executable not found. Please install Docker")
1227 print(
"==================================")
1232 print(
"==================================")
1233 print(
"Container is already running")
1234 print(
"==================================")
1238 command=[
"docker",
"run",
"-d",
"--restart=always",
"--name",str(registry),
"-e",
"REGISTRY_HTTP_ADDR=0.0.0.0:"+str(port),
"-p",
":".join([str(port)]*2)]
1240 if all([https_keypath, https_pemfile]):
1241 command.extend([
"-v",str(https_keypath)+
":/certs",
"-e",
"REGISTRY_HTTP_TLS_CERTIFICATE=/certs/"+str(https_pemfile),
1242 "-e",
"REGISTRY_HTTP_TLS_KEY=/certs/"+str(https_pemfile)])
1244 command.extend([kwargs.get(
"registry_base",
"registry:latest")])
1246 p = subprocess.check_call(command, shell=
True)
1250 command = [
"docker",
"run",
"-d",
"--restart=always",
"--name",str(registry)+
"_ui",
"-p",kwargs.get(
"ui_port",str(int(str(port))+50))+
":80"]
1251 command.extend([
"-e",
"REGISTRY_HOST="+str(GetHostIPAddress),
"-e",
"REGISTRY_PORT="+str(port)])
1253 if all([https_keypath, https_pemfile]): command.extend([
"-e",
"REGISTRY_PROTOCOL=https"])
1255 command.extend([
"-e",
"SSL_VERIFY=false",
"-e",
"ALLOW_REGISTRY_LOGIN=true",
"-e",
"REGISTRY_ALLOW_DELETE=true",
1256 "-e",
"REGISTRY_PUBLIC_URL="+
":".join([str(socket.gethostname()),str(port)]),kwargs.get(
"ui_base",
"parabuzzle/craneoperator:latest")])
1258 subprocess.check_call(command, shell=
True)
1263def GetDockerUI(name="portainer", image="portainer/portainer-ce:latest
"): # pragma: no cover
1265 Create a custom web UI for docker using Portainer.
1273 print(
"==================================")
1274 print(
"Docker executable not found. Please install Docker")
1275 print(
"==================================")
1280 print(
"==================================")
1281 print(
"Container is already running")
1282 print(
"==================================")
1286 command = [
"docker",
"run",
"-d"]
1287 command.extend([
"--name=%s" % name ,
"--restart=always"])
1288 command.extend([
"-p",
"8000:8000",
"-p",
"9000:9000",
"-p",
"9443:9443"])
1289 command.extend([
"-v",
"/var/run/docker.sock:/var/run/docker.sock",
"-v",
"portainer_data:/data", image])
1291 subprocess.check_call(command, shell=
not GetPlatform()
in [
"linux"])
1295def GetDockerRunner(runner="gitlab-runner",restart_policy="always", image="",token="", executor="shell",
1296 url='https://gitlab.dlr.de/
', tags="docker,linux", ssl_verify=True, **kwargs): # pragma: no cover
1298 Check if a current given registry is active and running. If not, start a local docker registry at the given port and name.
1299 Optionally, secure connection by a HTTPS certificate (PEM) and alter the base image
1306 if kwargs.get(
"as_script",
"")
and os.path.exists(os.path.dirname(kwargs.get(
"as_script",
""))):
1309 # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1310 # % Shell script for Docker/Linux (x64) %
1311 # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
1312 # Shell script for creating a GitLab runner programmatically
1313 # Created on 22.04.2022
1316 # -------------------------------------------------------------------------------------------------
1317 # Requirements and dependencies:
1321 # - Created // mg 22.04.2022
1323 # -------------------------------------------------------------------------------------------------
1324 # Process through all command line arguments (if given).
1340 -d=*|--description=*)
1341 description="${i#*=}"
1368 # Evaluate user settings
1370 url=${url:-"https://gitlab.dlr.de/"}
1371 system=${system:-"local"}
1372 image=${image:-"harbor.fa-services.intra.dlr.de/dockerhub/gitlab/gitlab-runner:latest"}
1373 executor=${executor:-"docker"}
1374 base=${base:-"alpine:latest"}
1375 description=${description:-"fa-docker"}
1376 locked=${locked:-"false"}
1377 return=${return:-"true"}
1379 command=$( echo "docker run --rm -v gitlab-runner-config:/etc/gitlab-runner ${image}" )
1380 if [ "$system" != "dind" ]; then
1381 download="https://packages.gitlab.com/install/repositories/runner/gitlab-runner"
1382 command=$( echo "gitlab-runner" )
1383 # Debian, Ubuntu and Mint
1384 ( command -v apt >/dev/null 2>&1 ; ) && ( curl -L "$download/script.deb.sh" | bash ) && ( apt-get install gitlab-runner ) ;
1385 # RHEL, CentOS and Fedora
1386 ( command -v yum >/dev/null 2>&1 ; ) && ( curl -L "$download/script.rpm.sh" | bash ) && ( yum install gitlab-runner ) ;
1389 docker volume create gitlab-runner-config
1390 docker run -d --name gitlab-runner --restart always -v gitlab-runner-config:/etc/gitlab-runner -v /var/run/docker.sock:/var/run/docker.sock --env TZ=DE -i ${image}
1392 $command register --non-interactive --executor ${executor} --docker-image ${base} --url ${url} --registration-token ${token} --description ${description} --tag-list ${executor} --run-untagged=true --locked=${locked} --access-level=not_protected --docker-privileged --docker-volumes /certs/client
1393 # Create a cron job deleting all leftover data in regular intervals.
1394 echo 'docker system prune --all --volumes --force' > cron.sh
1395 # Resume to main shell. Deactivate in Docker to keep the container running endlessly.
1396 if [ "$return" = "false" ]; then tail -f /dev/null; fi
1398 path = kwargs.get(
"as_script")
1399 with open(path,
"w")
as f: f.write(textwrap.dedent(script))
1400 print(
"==================================")
1401 print(
"Writing GitLab Runner Runtime script to: ")
1403 print(
"==================================")
1404 return os.path.abspath(path)
1408 print(
"==================================")
1409 print(
"Docker executable not found. Please install Docker")
1410 print(
"==================================")
1416 socket =
"/var/run/docker.sock:/var/run/docker.sock"
1417 if GetPlatform() ==
"windows": socket = socket[0] + socket
1419 subprocess.check_call([
"docker",
"volume",
"create",runner+
"-config"], shell=
not GetPlatform()
in [
"linux"])
1421 command = [
"docker",
"run",
"-d",
"--name",runner,
"--restart",restart_policy,
"-v",runner+
"-config"+
":/etc/gitlab-runner",
"-v",socket,
"--env",
"TZ=DE",
1422 "-i",
"gitlab/gitlab-runner:latest"]
1424 subprocess.check_call(command, shell=
not GetPlatform()
in [
"linux"])
1426 subprocess.check_call([
"docker",
"tag",
"gitlab/gitlab-runner:latest",image], shell=
not GetPlatform()
in [
"linux"])
1427 subprocess.check_call([
"docker",
"image",
"rm",
"gitlab/gitlab-runner:latest"], shell=
not GetPlatform()
in [
"linux"])
1430 _run_image=
"gitlab/gitlab-runner:latest"
1433 command = [
'docker',
'run',
'--rm',
'-v',runner+
"-config"+
":/etc/gitlab-runner",_run_image,
'register',
'--non-interactive',
'--executor',executor,
'--docker-image',
'alpine:latest',
1434 '--url',url,
'--registration-token',token,
'--description',str(runner),
'--tag-list',tags,
'--run-untagged=true',
'--locked=false',
'--access-level=not_protected',
1435 '--docker-privileged']
1437 if ssl_verify: command.extend([
"--docker-volumes",
"/certs/client"])
1438 subprocess.check_call(command, shell=
not GetPlatform()
in [
"linux"])
1440 print(
"==================================")
1441 print(
"Container is already running")
1442 print(
"==================================")
1448 Get all active Docker ports. Either from a given container, by a list of containers or all active containers.
1449 Defaults to all active containers. Supports WSL2 with Docker installed.
1457 print(
"==================================")
1458 print(
"Docker executable not found. Please install Docker")
1459 print(
"==================================")
1464 if not docker.lower().endswith(
".bat"): docker =
"docker"
1466 data = []; ports = [];
1468 active_container = kwargs.get(
"container",
" ".join(subprocess.check_output(([
"sudo"]
if IsDockerContainer()
else []) + [docker,
"ps",
"--quiet"]).decode(
"utf-8").split()).split(
"--quiet")[-1].split())
1469 if isinstance(active_container, str): active_container = list(active_container)
1470 for x
in active_container: data.extend(
" ".join(subprocess.check_output(([
"sudo"]
if IsDockerContainer()
else []) + [docker,
"port",x]).decode(
"utf-8").split()).split(x)[-1].split(
"->"))
1472 port = (x.split(
":")[-1]
if ":" in x
else "").split(
" ")[0];
1476 return list(set(ports))
1478@autotest("user","password")
1481 Creates a base64 encoded string of a given username and password combination used for Docker authentifiction.
1483 @param: username, password
1490 message = str(args[0])
1492 if len(args) >= 1: message = kwargs.get(
"delimn",
":").join([str(x)
for x
in args])
1494 message_bytes = message.encode(kwargs.get(
"encoding",
'utf-8'))
1495 base64_bytes = base64.b64encode(message_bytes)
1496 base64_message = base64_bytes.decode(kwargs.get(
"encoding",
'utf-8'))
1497 return base64_message
1502 Check whether current package lives inside a docker container.
1504 path =
'/proc/self/cgroup'
1505 return (os.path.exists(
'/.dockerenv')
or os.path.isfile(path)
and any(
'docker' in line
for line
in open(path)))
or GetExecutable(
"cexecsvc")
1510 Return the given string in quotes.
1512 return str(quote)+str(s)+str(quote)
1517 Check whether a string is empty and/or not given. Returns True otherwise.
1519 return bool(s
and s.strip())
1521@autotest("3551551677")
1524 Check whether a given string corresponds to a valid ISBN address.
1529 isbn = isbn.replace(
"-",
"").replace(
" ",
"").upper();
1530 match = re.search(
r'^(\d{9})(\d|X)$', isbn)
1535 digits = match.group(1)
1536 check_digit = 10
if match.group(2) ==
'X' else int(match.group(2))
1538 result = sum((i + 1) * int(digit)
for i, digit
in enumerate(digits))
1541 return (result % 11) == check_digit
1546 Detect if the script is running inside WSL or WSL2 on windows.
1548 @note: WSL is thought to be the only common Linux kernel with Microsoft in the name, per Microsoft:
1549 https://github.com/microsoft/WSL/issues/4071#issuecomment-496715404
1551 @author: https://www.scivision.dev/python-detect-wsl/
1553 return 'Microsoft' in platform.uname().release
1557 Return s as drive to start an absolute path with path.join(...).
1559 if sep != ntpath.sep:
1561 drive = posixpath.join(posixpath.sep,s)
1564 drive = ntpath.join(s+
":",ntpath.sep)
1567def Popen(command, verbosity=1, encoding="utf-8", **kwargs):
1569 Run command line string "command" in a separate subprocess.
1570 Show output in current console window in dependence of verbosity level:
1572 - 1 --> Only show errors
1573 - 2 --> Show every command line output.
1576 @param: command, verbosity
1577 @type: string, integer
1579 shell = kwargs.get(
"shell",
not IsDockerContainer()
or isinstance(command, six.string_types))
1581 if shell
and not kwargs.get(
"collect",
True):
1582 p = subprocess.check_call(command)
1584 p = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1585 stderr=subprocess.PIPE, shell=shell, env=kwargs.get(
"env",os.environ.copy()), universal_newlines=kwargs.get(
"raw",
False))
1587 if kwargs.get(
"raw",
False):
1589 elif kwargs.get(
"collect",
True):
1590 stdout, stderr = p.communicate()
1593 print((stdout.decode(encoding,errors=
'ignore').replace(kwargs.get(
"replace",
"\n"),
'')))
1595 print((stderr.decode(encoding,errors=
'ignore').replace(kwargs.get(
"replace",
"\n"),
'')))
1599 stdout = p.stdout.readline()
1601 if not IsNotEmpty(stdout)
and p.poll()
is not None:
1604 sys.stdout.write(stdout.decode(encoding,errors=
'ignore').replace(kwargs.get(
"replace",
"\n"),
''))
1607 _, stderr = p.communicate()
1609 sys.stderr.write(stderr.decode(encoding,errors=
'ignore').replace(kwargs.get(
"replace",
"\n"),
''))
1614def SSHPopen(ssh_client, command, verbosity=1, **kwargs):
1616 Run command line string "command" in a separate SSH client process.
1617 Show output in current console window in dependence of verbosity level:
1619 - 1 --> Only show errors
1620 - 2 --> Show every command line output.
1623 @param: command, verbosity
1624 @type: string, integer
1626 _, stdout, stderr = ssh_client.exec_command(command, get_pty=kwargs.get(
"tty",
False))
1629 if not kwargs.get(
"collect",
True)
and verbosity >= 1:
1630 for line
in iter(
lambda: stdout.read(2048).decode(
'utf-8',
'ignore'),
""):
1632 _ = line; eval(
'print(line, end="")');
1634 return stdout.channel.recv_exit_status()
1637 sexit = stdout.channel.recv_exit_status()
1639 sout = stdout.readlines()
1640 serr = stderr.readlines()
1642 if verbosity >= 2
and sout:
1643 print(
"".join(sout))
1644 if verbosity >= 1
and serr:
1645 print(
"".join(serr))
1651 Utility function to convert a given *XLSX file to an *XLS.
1655 if not outdir: outdir = os.getcwd()
1656 excel = os.path.abspath(excel_file)
1658 ps =
".".join([str(next(tempfile._get_candidate_names())).replace(
"_",
""),
"ps1"])
1666 subprocess.check_call([
"sudo",
"apt-get",
"update"])
1667 subprocess.check_call([
"sudo",
"apt-get",
"install",
"-y",
"libreoffice"])
1671 except:
raise NotImplementedError
1675 else: command = [
"libreoffice",
"--headless",
"--convert-to",
"xls",
PathLeaf(excel)]
1677 else: command = [
"powershell.exe",
'.\\'+ps,
PathLeaf(excel)]
1681 shutil.copyfile(excel, xlsx)
1683 with open(ps,
'w')
as f:
1684 f.write(textwrap.dedent(
'''\
1686 $myDir = split-path -parent $MyInvocation.MyCommand.Path
1687 $excelFile = "$myDir\\" + $File
1688 $Excel = New-Object -ComObject Excel.Application
1689 $wb = $Excel.Workbooks.Open($excelFile)
1690 $out = "$myDir\\" + (Get-Item ("$myDir\\" + $File) ).Basename + ".xls"
1691 $Excel.DisplayAlerts = $false;
1692 $wb.SaveAs($out, 56)
1696 subprocess.check_call(command, env=os.environ.copy()
1697 if not GetPlatform()
in [
"linux"]
else dict(os.environ, LD_PRELOAD=
"/usr/lib/x86_64-linux-gnu/libfreetype.so.6"))
1698 shutil.copyfile(xlsx.replace(
".xlsx",
".xls"),os.path.join(outdir,xlsx.replace(
".xlsx",
".xls")))
1700 return os.path.join(outdir,xlsx.replace(
".xlsx",
".xls"))
1702def CreateArchive(output, source=None, exclude=[".git",".svn","__pycache__"], **kwargs):
1704 Create an archive from the given source directory. Defaults to the current project.
1706 @param: output - Absolute output path
1707 @param: source - Source path. Defaults to the current project.
1708 @param: exclude - Files and directories to be ignored
1711 def tar_exclude(tarinfo):
1713 Exclude pattern for tar
1715 local = set(tarinfo.name.split(posixpath.sep))
1716 if local.intersection(set(exclude)):
return None
1721 if source: from_source = source
1724 output_filename=os.path.abspath(output);
1725 source_dir=os.path.abspath(from_source);
1728 if PathLeaf(output_filename).split(
".")[1].lower()
in [
"zip"]:
1729 archive = zipfile.ZipFile(output_filename,
"w", zipfile.ZIP_DEFLATED)
1730 rootlen = len(source_dir) + 1
1732 for dirpath, _, filenames
in os.walk(source_dir):
1733 for filename
in filenames:
1735 filepath = os.path.join(dirpath, filename)
1736 parentpath = os.path.relpath(filepath, source_dir)
1738 if parentpath.startswith((
"."))
or any(
PathLeaf(filepath) == x
for x
in exclude):
continue
1739 if any(x
in filepath.replace(
PathLeaf(filepath),
"")
for x
in exclude):
continue
1741 archive.write(filepath, filepath[rootlen:])
1744 compression =
PathLeaf(output_filename).split(
".")[-1]
1745 archive = tarfile.open(output_filename,
"w" +
"%s" % (
":"+compression
if compression
in [
"gz",
"bz2",
"xz"]
else ""));
1746 archive.add(source_dir, arcname=os.path.basename(source_dir), filter=tar_exclude);
1750 return os.path.exists(output)
1754 Resize a given image (absolute path) and returns a resized image object.
1756 @param: source - Source path. Must be an absolute path to the icon
1757 @param: size - Single integer value representing the new icon size
1760 from PIL
import Image, ImageOps, ImageChops
1764 Utility function to check for the existence of an image border
1766 bg = Image.new(im.mode, im.size, im.getpixel((0,0)))
1767 diff = ImageChops.difference(im, bg)
1768 diff = ImageChops.add(diff, diff, 2.0, -100)
1769 bbox = diff.getbbox()
1770 return all((bbox[0], bbox[1], (bbox[0] + bbox[2]) <= im.size[0], (bbox[1] + bbox[3]) <= im.size[1]))
1772 base_image = copy.deepcopy(image); base_width = size
1773 base_offset = int(max(2,kwargs.get(
"base_offset",20)))
1775 icon = Image.open(base_image);
1777 base_padding =
not hasPadding(icon)
and base_offset >= 0
1779 if base_padding: base_width -= base_offset
1781 wpercent = (base_width / float(icon.size[0]))
1782 hsize = int((float(icon.size[1]) * float(wpercent)))
1784 try: icon = icon.resize((base_width, hsize), Image.ANTIALIAS)
1785 except: icon = icon.resize((base_width, hsize), Image.Resampling.LANCZOS)
1787 if base_padding: icon = ImageOps.expand(icon, border=(int(base_offset//2),)*4, fill=icon.getpixel((0,0)) )
1793 Concatenate all files into one.
1798 with open(filename,
'wb')
as wfd:
1799 for f
in [os.path.join(source,cs)
if IsNotEmpty(str(os.path.splitext(cs)[1]))
1800 else os.path.join(source,cs+ending)
for cs
in files]:
1802 if not os.path.exists(f):
continue
1806 wfd.write(
"\n\n".encode())
1807 with open(f,
'rb')
as fd:
1808 shutil.copyfileobj(fd, wfd, 1024*1024*10)
1811def ReplaceTextinFile(filename, outname, replace, inend='', outend='', source=os.getcwd(), **kwargs):
1813 Replace all occurrences of replace in filename.
1815 Inputfile = os.path.join(source,filename+inend)
1816 Outputfile = outname+outend
1819 with io.open(Inputfile, errors=kwargs.get(
"error_handling",
"replace"), encoding=
"utf-8")
as infile, io.open(Outputfile,
'w', encoding=
"utf-8")
as outfile:
1821 for src, target
in replace.items():
1822 line = line.replace(src, target)
1826 with open(Inputfile)
as infile, open(Outputfile,
'w')
as outfile:
1828 for src, target
in replace.items():
1829 line = line.replace(src, target)
1832@autotest([site.getuserbase(),site.getusersitepackages()]*2, site.getuserbase())
1835 Remove all duplicates in a list of strings
1837 count = 0; result = []
1841 var =
" ".join(ele.split())
1843 if var == str(Duplicate):
1850 result.append(str(Duplicate))
1852 result.append(var.strip())
1859 Delete all files from workspace
1861 @author: Marc Garbade, 26.02.2018
1863 @param: identifier: A tuple specifying the files to remove.
1866 for f
in os.listdir(os.getcwd()):
1867 if f.endswith(identifier):
1872 Delete all redundant folders from the current workspace
1874 def _IgnoreReadOnly(action, name, exc):
1876 Delete read-only folders as well
1878 os.chmod(name, stat.S_IWRITE)
1882 pattern = os.path.join(os.getcwd(), Identifier)
1885 for item
in glob.glob(pattern):
1886 if not os.path.isdir(item):
continue
1887 directory =
r'\\?\ '.strip()+item
1890 shutil.rmtree(directory, onerror=_IgnoreReadOnly)
1893 shutil.rmtree(directory)
1897 Bind a function to an existing object.
1899 return MethodType(_func, _obj)
1901@autotest(type('Class', (object,), {
"data": np.array([0, 1, 2, 3])}))
1904 Prepare a object for pickling and convert all numpy
1905 arrays to python defaults (2to3 compatible).
1907 _dictobj = _obj.__dict__.copy()
1908 _dictobj[
'__np_obj_path'] = []
1910 if isinstance(value, (np.ndarray, np.generic)):
1912 for step
in path[:-1]:
1913 parent = parent[step]
1914 parent[path[-1]] = value.tolist()
1915 _dictobj[
'__np_obj_path'].append(path[-1])
1918@autotest({"data": [[0, 1],[2, 3]], "__np_obj_path": ["data"]})
1921 Restore the original dictionary by converting python defaults to their
1922 numpy equivalents if required (2to3 compatible).
1924 _dictobj = _dict.copy()
1926 for key, value
in _dictobj.items():
1927 if key
in _dictobj.get(
'__np_obj_path',[]):
1928 _dictobj[key] = np.float64(value)
1929 if len(np.shape(value)) == 2:
1930 _dictobj[key] = np.asmatrix(value)
1936 Return the last item of an arbitrary path (its leaf).
1938 head, tail = ntpath.split(path)
1939 return tail
or ntpath.basename(head)
1943 Restore the original dictionary by converting Python defaults to their
1944 numpy equivalents if required (2to3 compatible).
1947 if isinstance(i, (list,tuple, np.ndarray)):
1953@autotest('{"data": True}')
1956 Evaluate a given expression using ast.literal_eval while treating every string as raw.
1958 node_or_string = expression
1959 if isinstance(expression,str): node_or_string =
r"%s" % expression
if GetPlatform()
in [
"windows"]
else r"'%s'" % expression
1960 return ast.literal_eval(node_or_string)
1964 Move given src to defined dest while waiting for completion of the process.
1972 p =
Popen(
" ".join([copy,src,dest]), verbosity=verbose)
1978 Walk recursively through path. Check if all files listed in source are present.
1979 If True, return them. If False, return all files present in the given path.
1982 files = list([]); request = source;
1984 if not isinstance(request, (list,tuple,)): request = [request]
1986 InputFiles = [f
for f
in os.listdir(path)
if os.path.isfile(os.path.join(path, f))]
1989 if all(np.array([len(request),len(InputFiles)]) >= 1):
1991 if set([os.path.splitext(x)[0]
for x
in request]).issubset(set([os.path.splitext(f)[0]
for f
in InputFiles])):
1992 files.extend(request)
1994 files.extend(InputFiles)
1996 files.extend(InputFiles)
2001@autotest(object, AbstractBase)
2004 Recursively find a class object by name and return its object from another class.
2006 @note: Returns None if reference cannot be resolved. Can be safely used with AbstractBase classes and reload.
2009 for x
in getattr(cls,
"__bases__"):
2010 if x.__name__
in [reference.__name__]:
2011 result = copy.deepcopy(x)
2016@autotest(site.getusersitepackages(), contains=
"DLR")
2017def PathWalk(path, startswith=None, endswith=None, contains=None, **kwargs):
2019 Walk recursively through path. Exclude both folders and files if requested.
2021 def Skip(x, starts, contains, ends):
2023 Evaluate skip condition
2025 if isinstance(starts, (six.string_types, tuple, list)):
2026 if x.startswith(starts
if not isinstance(starts,list)
else tuple(starts)):
2028 if isinstance(contains, (six.string_types, tuple, list)):
2029 tmp = list([]); tmp.append(contains)
2031 if any(s
in x
for s
in tmp):
2033 if isinstance(ends, (six.string_types, tuple, list)):
2034 if x.endswith(ends
if not isinstance(ends,list)
else tuple(ends)):
2038 for root, dirs, files
in os.walk(os.path.normpath(path)):
2039 if kwargs.get(
"exclude",any([startswith, endswith, contains])):
2040 files = [f
for f
in files
if not Skip(f,startswith,contains,endswith)]
2041 dirs[:] = [d
for d
in dirs
if not Skip(d,startswith,contains,endswith)]
2042 yield root, dirs, files
2044@autotest(AbstractMethod)
2047 Walk recursively through nested python objects.
2049 @author: Yaniv Aknin, 13.12.2011
2051 @param: obj, path, memo
2052 @type: object, list, boolean
2054 string_types = six.string_types
2055 iteritems =
lambda mapping: getattr(mapping,
'iteritems', mapping.items)()
2059 if isinstance(obj, dict):
2060 iterator = iteritems
2061 elif isinstance(obj, (list, set, tuple))
and not isinstance(obj, string_types):
2062 iterator = enumerate
2064 if id(obj)
not in memo:
2066 for path_component, value
in iterator(obj):
2067 for result
in ObjectWalk(value, path + (path_component,), memo):
2069 memo.remove(id(obj))
2073@autotest(sys.executable, terminate=False)
2076 Walk through all active processes on the host machine,
2083 for proc
in psutil.process_iter():
2084 if os.getpid() == proc.pid:
2087 for key, value
in proc.as_dict().items():
2088 if executable
in str(key)
or executable
in str(value):
2089 proc_delimn =
" "; processName = proc.name(); processID = proc.pid;
2090 if executable
in str(processName):
2091 if terminate: proc.kill()
2092 print(
"==================================")
2093 print(
"Found SSH process @ " + str(proc_delimn.join([str(processName),
':::',str(processID)])))
2094 if terminate: print(
"The process is terminated.")
2095 print(
'==================================')
2096 except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
2100setattr(sys.modules[__name__],
"setLogger", getattr(sys.modules[__name__],
"GetLogger"))
2101setattr(sys.modules[__name__],
"getLogger", getattr(sys.modules[__name__],
"GetLogger"))
2103if __name__ ==
'__main__':
Module containing all relevant modules and scripts associated with the building process.