# Automation with Python and STAF/STAX

0
2330

The reader should note that the solution is only intended to explain how Python and STAF may be used. No claim is made that the solution presented here is the best one in any way, just that is one more option that the reader may consider in future developments.

# The Problem

Let’s imagine that we have a computer network in which a machine periodically generates some kind of file with information that is of interest to other machines in that network. For example, let’s say that this file is a new software build of a product that must transferred to a group of remote machines, in which its functionality has to be tested to make sure it can be delivered to the client.

# The Python-only solution

## Sequential

A simple solution to make the software build available to all the testing machines could be to copy it to a specific directory whenever a new file is available. For additional security, let’s suppose that we’re required to verify that the md5 sum for both original and destination files is equal to ensure that build file was copied correctly.

If it is considered that /tmp is a good destination directory, then the following script will do the job:

`     1	#!/usr/bin/python     2	"""     3	Copy a given file to a list of destination machines sequentially     4	"""     5	     6	import os, argparse     7	import subprocess     8	import logging     9	    10	def main(args):    11	    logging.basicConfig(level=logging.INFO, format="%(message)s")    12	    13	    # Calculate md5 sum before copyin the file    14	    orig_md5 = run_command("md5sum %s" % args.file).split()[0]    15	        16	    # Copy the file to every requested machine and verify    17	    # that md5 sum of the destination file is equal    18	    # to the md5 sum of the original file    19	    for machine in args.machines:    20	        run_command("scp %s %s:/tmp/" % (args.file, machine))    21	        dest_md5 = run_command("ssh %s md5sum /tmp/%s"     22	                               % (machine, os.path.basename(args.file))).split()[0]    23	        assert orig_md5 == dest_md5    24	    25	def run_command(command_str):    26	    """    27	    Run a given command and another process and return stdout     28	    """    29	    logging.info(command_str)    30	    return subprocess.Popen(command_str, stdout=subprocess.PIPE,    31	                            shell=True).communicate()[0]    32	        33	if __name__ == "__main__":    34	    parser = argparse.ArgumentParser(description=__doc__)    35	    parser.add_argument("file",    36	                        help="File to copy")    37	    parser.add_argument(metavar="machine", dest="machines", nargs="+",    38	                        help="List of machines to which file must be copied")    39	    40	    args = parser.parse_args()    41	    args.file = os.path.realpath(args.file)    42	    main(args)`

Here it is assumed that ssh keys have been exchanged between origin and destination machines for automatic authentication without human intervention.

The script makes use of the Popen class in the subprocess python standard library. This powerful library provides the capability to launch new operating system processes and capture not only the result code, but also the standard output and error streams. However, it should be taken into account that the Popen class cannot be used to invoke commands on a remote machine by itself. However, as it can be seen in the code, ssh and related commands may be used to launch processes on remote machines when configured properly.

For example, if the file of interest was STAF325-src.tar.gz (STAF 3.2.5 source) and the remote machines were 192.168.1.1 and 192.168.1.2, then the file would be copied using the copy.py script in the following way:

`\$ ./copy.py STAF325-src.tar.gz 192.168.1.{1,2}md5sum STAF325-src.tar.gzscp STAF325-src.tar.gz 192.168.1.1:/tmp/ssh 192.168.1.1 md5sum /tmp/STAF325-src.tar.gzscp STAF325-src.tar.gz 192.168.1.2:/tmp/ssh 192.168.1.2 md5sum /tmp/STAF325-src.tar.gz`

## Parallel

What would happen if the files were copied in parallel? For this example, it might not make much sense given that probably the network is at bottleneck and there isn’t any increase in performance. However, in the case of the md5sum operation, it’s a waste of time waiting for the operation to complete on one machine while the other is essentially idle waiting for the next command. Clearly, it would be more interesting to make both machines do the job in parallel to take advantage of CPU cycles.

A parallel implementation similar to the sequential one is displayed below:

`     1	#!/usr/bin/python     2	"""     3	Copy a given file to a list of destination machines in parallel     4	"""     5	     6	import os, argparse     7	import subprocess     8	import logging     9	import threading    10	    11	def main(args):    12	    logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")    13	    orig_md5 = run_command("md5sum %s" % args.file).split()[0]    14	    15	    # Create one thread for machine    16	    threads = [ WorkingThread(machine, args.file, orig_md5)    17	                for machine in args.machines]    18	    19	    # Run all threads    20	    for thread in threads:    21	        thread.start()    22	    23	    # Wait for all threads to finish    24	    for thread in threads:    25	        thread.join()    26	    27	class WorkingThread(threading.Thread):    28	    """    29	    Thread that performs the copy operation for one machine    30	    """    31	    def __init__(self, machine, orig_file, orig_md5):    32	        threading.Thread.__init__(self)    33	            34	        self.machine = machine    35	        self.file = orig_file    36	        self.orig_md5 = orig_md5    37	    38	    def run(self):    39	        # Copy file to remote machine    40	        run_command("scp %s %s:/tmp/" % (self.file, self.machine))    41	    42	        # Calculate md5 sum of the file copied at the remote machine    43	        dest_md5 = run_command("ssh %s md5sum /tmp/%s"     44	                               % (self.machine, os.path.basename(self.file))).split()[0]    45	        assert self.orig_md5 == dest_md5    46	    47	def run_command(command_str):    48	    """    49	    Run a given command and another process and return stdout     50	    """    51	    logging.info(command_str)    52	    return subprocess.Popen(command_str, stdout=subprocess.PIPE,    53	                            shell=True).communicate()[0]    54	        55	if __name__ == "__main__":    56	    parser = argparse.ArgumentParser(description=__doc__)    57	    parser.add_argument("file",    58	                        help="File to copy")    59	    parser.add_argument(metavar="machine", dest="machines", nargs="+",    60	                        help="List of machines to which file must be copied")    61	    62	    args = parser.parse_args()    63	    args.file = os.path.realpath(args.file)    64	    main(args)`

Here the same assumptions as in the sequential case are made.

In this solution the work that was done inside the for loop is now implemented in the run method of a class that is inherited from threading.Thread class, which is a class that provides an easy way to create working threads such as the ones in the example.

In this case, the output of the command, using the same arguments as in the previous example, is:

`\$ ./copy_parallel.py STAF325-src.tar.gz 192.168.1.{1,2}MainThread: md5sum STAF325-src.tar.gzThread-1: scp STAF325-src.tar.gz 192.168.1.1:/tmp/Thread-2: scp STAF325-src.tar.gz 192.168.1.2:/tmp/Thread-2: ssh 192.168.1.2 md5sum /tmp/STAF325-src.tar.gzThread-1: ssh 192.168.1.1 md5sum /tmp/STAF325-src.tar.gz`

As it can be seen in the logs, md5sum command execution isn’t necessarily executed in the same order as threads were created. This solution isn’t much more complex than the sequential one, but it finishes earlier. Hence, in the case in which a CPU intensive task must be performed in every machine, the parallel solution will be more convenient since the small increment in coding complex will pay off in execution performance.

# The Python+STAF solution

## Sequential

The solutions to the problem presented in the previous section are perfectly fine. However, some developers may find it cumbersome to write scripts from scratch using Popen class and desire to work with a platform with feature such as launching process on remote machines already implemented.

That’s were STAF (Software Testing Automation Framework) might be helpful. STAF is a framework that provides the ability to automate jobs specially, but not uniquely, for testing environments. STAF is implemented as a process which runs on every machine that provides services that may be used by clients to accomplish different tasks. For more information regarding STAF, please refer to the project homepage.

`     1	#!/usr/bin/python     2	"""     3	Copy a given file to a list of destination machines sequentially     4	"""     5	     6	import os, argparse     7	import subprocess     8	import logging     9	import PySTAF    10	    11	def main(args):    12	    logging.basicConfig(level=logging.INFO, format="%(message)s")    13	    handle = PySTAF.STAFHandle(__file__)    14	    15	    # Calculate md5 sum before copyin the file    16	    orig_md5 = run_process_command(handle, "local", "md5sum %s" % args.file).split()[0]    17	    18	    # Copy the file to every requested machine and verify    19	    # that md5 sum of the destination file is equal    20	    # to the md5 sum of the original file    21	    for machine in args.machines:    22	        copy_file(handle, args.file, machine)    23	        dest_md5 = run_process_command(handle, machine, "md5sum /tmp/%s"     24	                                       % os.path.basename(args.file)).split()[0]    25	        assert orig_md5 == dest_md5    26	    27	    handle.unregister()    28	    29	def run_process_command(handle, location, command_str):    30	    """    31	    Run a given command and another process and return stdout     32	    """    33	    logging.info(command_str)    34	    35	    result = handle.submit(location, "PROCESS", "START SHELL COMMAND %s WAIT RETURNSTDOUT"     36	                           % PySTAF.STAFWrapData(command_str))    37	    assert result.rc == PySTAF.STAFResult.Ok    38	    39	    mc = PySTAF.unmarshall(result.result)    40	    return mc.getRootObject()['fileList'][0]['data']    41	        42	def copy_file(handle, filename, destination):    43	    """    44	    Run a given command and another process and return stdout     45	    """    46	    logging.info("copying %s to %s" % (filename, destination))    47	    48	    result = handle.submit("local", "FS", "COPY FILE %s TODIRECTORY /tmp TOMACHINE %s"     49	                           % (PySTAF.STAFWrapData(filename),    50	                              PySTAF.STAFWrapData(destination)))    51	    assert result.rc == PySTAF.STAFResult.Ok    52	    53	if __name__ == "__main__":    54	    parser = argparse.ArgumentParser(description=__doc__)    55	    parser.add_argument("file",    56	                        help="File to copy")    57	    parser.add_argument(metavar="machine", dest="machines", nargs="+",    58	                        help="List of machines to which file must be copied")    59	    60	    args = parser.parse_args()    61	    args.file = os.path.realpath(args.file)    62	    main(args)`

The code makes use of PySTAF, a python library, which is shipped with the STAF software that provides the ability to interact with the framework as a client. The typical usage of the library may summarized as follows:

• Register a handle in STAF (line 13): The communication with the server process is managed using handles. A client must have a handle to be able to send requests to local and/or remote machines.
• Submit requests (lines 35 and 48): Once the handle is available at the client, the client can use it to submit requests to any location and service. The two basic services that are used in this example are PROCESS, which is used to launch processes on a machine the same way ssh was used in the python-only version of the example; and FS, which is used to copy files between different machines as scp was used in the python-only solution.
• Check result code (lines 37 and 51): After a request has been submitted, result code should be checked to make sure that there wasn’t any communication or syntax problem.
• Unmarshall results (lines 39-40): When the standard output is captured, it must be unmarshalled before using it in python since responses are encoded in a language independent format.
• Unregister handle (line 27): When STAF isn’t needed anymore, it’s advisable to unregister the handle to free resources allocated to the client in the server.

Compared with the python-only solution, the advantages of STAF aren’t appreciable at first sight. The handler syntax isn’t easier than creating Popen objects and we have to deal with marshalling when we previously were just parsing text. However, as a framework, if has to be taken into account that it is has a learning curve and has much more functionality to offer than this one that makes it worthwhile. Please bear with me until section 5, in which the STAX solution we’ll be shown, with an example with a completely different approach to the problem.

Using the script in this section, the output would be pretty much the same as the previous sequential example:

`\$ ./staf_copy.py STAF325-src.tar.gz 192.168.1.{1,2}md5sum STAF325-src.tar.gzcopying STAF325-src.tar.gz to 192.168.1.1md5sum /tmp/STAF325-src.tar.gzcopying STAF325-src.tar.gz to 192.168.1.2md5sum /tmp/STAF325-src.tar.gz`

As in the previous section, the sequential solution suffers the same problems when CPU intensive tasks are to be performed. Hence, the same comments apply.

## Parallel

When using STAF, the parallel solution requires the same changes that were explained before. That is, create a new class that inherits from threading.Thread and implement the working threads. The code below shows how this might be implemented:

`     1	#!/usr/bin/python     2	"""     3	Copy a given file to a list of destination machines in parallel     4	"""     5	     6	import os, argparse     7	import subprocess     8	import logging     9	import threading    10	import PySTAF    11	    12	def main(args):    13	    logging.basicConfig(level=logging.INFO, format="%(threadName)s %(message)s")    14	    handle = PySTAF.STAFHandle(__file__)    15	    orig_md5 = run_process_command(handle, "local", "md5sum %s" % args.file).split()[0]    16	    17	    # Create one thread for machine    18	    threads = [ WorkingThread(machine, args.file, orig_md5)    19	                for machine in args.machines]    20	    21	    # Run all threads    22	    for thread in threads:    23	        thread.start()    24	    25	    # Wait for all threads to finish    26	    for thread in threads:    27	        thread.join()    28	    29	    handle.unregister()    30	    31	class WorkingThread(threading.Thread):    32	    """    33	    Thread that performs the copy operation for one machine    34	    """    35	    def __init__(self, machine, orig_file, orig_md5):    36	        threading.Thread.__init__(self)    37	            38	        self.machine = machine    39	        self.file = orig_file    40	        self.orig_md5 = orig_md5    41	        self.handle = PySTAF.STAFHandle("%s:%s" % (__file__, self.getName()))    42	    43	    def run(self):    44	        # Copy file to remote machine    45	        copy_file(self.handle, self.file, self.machine)    46	    47	        # Calculate md5 sum of the file copied at the remote machine    48	        dest_md5 = run_process_command(self.handle, self.machine, "md5sum /tmp/%s"     49	                                       % os.path.basename(self.file)).split()[0]    50	        assert self.orig_md5 == dest_md5    51	        self.handle.unregister()    52	    53	def run_process_command(handle, location, command_str):    54	    """    55	    Run a given command and another process and return stdout     56	    """    57	    logging.info(command_str)    58	    59	    result = handle.submit(location, "PROCESS", "START SHELL COMMAND %s WAIT RETURNSTDOUT"     60	                           % PySTAF.STAFWrapData(command_str))    61	    assert result.rc == PySTAF.STAFResult.Ok    62	    63	    mc = PySTAF.unmarshall(result.result)    64	    return mc.getRootObject()['fileList'][0]['data']    65	        66	def copy_file(handle, filename, destination):    67	    """    68	    Run a given command and another process and return stdout     69	    """    70	    logging.info("copying %s to %s" % (filename, destination))    71	    72	    result = handle.submit("local", "FS", "COPY FILE %s TODIRECTORY /tmp TOMACHINE %s"     73	                           % (PySTAF.STAFWrapData(filename),    74	                              PySTAF.STAFWrapData(destination)))    75	    assert result.rc == PySTAF.STAFResult.Ok    76	    77	if __name__ == "__main__":    78	    parser = argparse.ArgumentParser(description=__doc__)    79	    parser.add_argument("file",    80	                        help="File to copy")    81	    parser.add_argument(metavar="machine", dest="machines", nargs="+",    82	                        help="List of machines to which file must be copied")    83	    84	    args = parser.parse_args()    85	    args.file = os.path.realpath(args.file)    86	    main(args)`

As it happened before, this solution is faster since it takes advantage of having multiple CPUs working on md5sum calculation instead of just one at a time. The output we get invoking the script could be:

`\$ ./staf_copy_parallel.py STAF325-src.tar.gz 192.168.1.{1,2}MainThread md5sum STAF325-src.tar.gzThread-1 copying STAF325-src.tar.gz to 192.168.1.1Thread-2 copying STAF325-src.tar.gz to 192.168.1.2Thread-2 md5sum /tmp/STAF325-src.tar.gzThread-1 md5sum /tmp/STAF325-src.tar.gz`

This time it can be seen that md5sum calculation mustn’t necessarily start in the same order as file copy operation.

Once again, this solution is slightly more complex, but the gain in performance makes it convenient when dealing with tasks with high computational cost.