12 min read

The reader should note that the solution is only intended to explain how Python and STAF may be used. No claim is made that the solution presented here is the best one in any way, just that is one more option that the reader may consider in future developments.

The Problem

Let’s imagine that we have a computer network in which a machine periodically generates some kind of file with information that is of interest to other machines in that network. For example, let’s say that this file is a new software build of a product that must transferred to a group of remote machines, in which its functionality has to be tested to make sure it can be delivered to the client.

The Python-only solution

Sequential

A simple solution to make the software build available to all the testing machines could be to copy it to a specific directory whenever a new file is available. For additional security, let’s suppose that we’re required to verify that the md5 sum for both original and destination files is equal to ensure that build file was copied correctly.

If it is considered that /tmp is a good destination directory, then the following script will do the job:

     1	#!/usr/bin/python
2 """
3 Copy a given file to a list of destination machines sequentially
4 """
5
6 import os, argparse
7 import subprocess
8 import logging
9
10 def main(args):
11 logging.basicConfig(level=logging.INFO, format="%(message)s")
12
13 # Calculate md5 sum before copyin the file
14 orig_md5 = run_command("md5sum %s" % args.file).split()[0]
15
16 # Copy the file to every requested machine and verify
17 # that md5 sum of the destination file is equal
18 # to the md5 sum of the original file
19 for machine in args.machines:
20 run_command("scp %s %s:/tmp/" % (args.file, machine))
21 dest_md5 = run_command("ssh %s md5sum /tmp/%s"
22 % (machine, os.path.basename(args.file))).split()[0]
23 assert orig_md5 == dest_md5
24
25 def run_command(command_str):
26 """
27 Run a given command and another process and return stdout
28 """
29 logging.info(command_str)
30 return subprocess.Popen(command_str, stdout=subprocess.PIPE,
31 shell=True).communicate()[0]
32
33 if __name__ == "__main__":
34 parser = argparse.ArgumentParser(description=__doc__)
35 parser.add_argument("file",
36 help="File to copy")
37 parser.add_argument(metavar="machine", dest="machines", nargs="+",
38 help="List of machines to which file must be copied")
39
40 args = parser.parse_args()
41 args.file = os.path.realpath(args.file)
42 main(args)

Here it is assumed that ssh keys have been exchanged between origin and destination machines for automatic authentication without human intervention.

The script makes use of the Popen class in the subprocess python standard library. This powerful library provides the capability to launch new operating system processes and capture not only the result code, but also the standard output and error streams. However, it should be taken into account that the Popen class cannot be used to invoke commands on a remote machine by itself. However, as it can be seen in the code, ssh and related commands may be used to launch processes on remote machines when configured properly.

For example, if the file of interest was STAF325-src.tar.gz (STAF 3.2.5 source) and the remote machines were 192.168.1.1 and 192.168.1.2, then the file would be copied using the copy.py script in the following way:

$ ./copy.py STAF325-src.tar.gz 192.168.1.{1,2}
md5sum STAF325-src.tar.gz
scp STAF325-src.tar.gz 192.168.1.1:/tmp/
ssh 192.168.1.1 md5sum /tmp/STAF325-src.tar.gz
scp STAF325-src.tar.gz 192.168.1.2:/tmp/
ssh 192.168.1.2 md5sum /tmp/STAF325-src.tar.gz

Parallel

What would happen if the files were copied in parallel? For this example, it might not make much sense given that probably the network is at bottleneck and there isn’t any increase in performance. However, in the case of the md5sum operation, it’s a waste of time waiting for the operation to complete on one machine while the other is essentially idle waiting for the next command. Clearly, it would be more interesting to make both machines do the job in parallel to take advantage of CPU cycles.

A parallel implementation similar to the sequential one is displayed below:

     1	#!/usr/bin/python
2 """
3 Copy a given file to a list of destination machines in parallel
4 """
5
6 import os, argparse
7 import subprocess
8 import logging
9 import threading
10
11 def main(args):
12 logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
13 orig_md5 = run_command("md5sum %s" % args.file).split()[0]
14
15 # Create one thread for machine
16 threads = [ WorkingThread(machine, args.file, orig_md5)
17 for machine in args.machines]
18
19 # Run all threads
20 for thread in threads:
21 thread.start()
22
23 # Wait for all threads to finish
24 for thread in threads:
25 thread.join()
26
27 class WorkingThread(threading.Thread):
28 """
29 Thread that performs the copy operation for one machine
30 """
31 def __init__(self, machine, orig_file, orig_md5):
32 threading.Thread.__init__(self)
33
34 self.machine = machine
35 self.file = orig_file
36 self.orig_md5 = orig_md5
37
38 def run(self):
39 # Copy file to remote machine
40 run_command("scp %s %s:/tmp/" % (self.file, self.machine))
41
42 # Calculate md5 sum of the file copied at the remote machine
43 dest_md5 = run_command("ssh %s md5sum /tmp/%s"
44 % (self.machine, os.path.basename(self.file))).split()[0]
45 assert self.orig_md5 == dest_md5
46
47 def run_command(command_str):
48 """
49 Run a given command and another process and return stdout
50 """
51 logging.info(command_str)
52 return subprocess.Popen(command_str, stdout=subprocess.PIPE,
53 shell=True).communicate()[0]
54
55 if __name__ == "__main__":
56 parser = argparse.ArgumentParser(description=__doc__)
57 parser.add_argument("file",
58 help="File to copy")
59 parser.add_argument(metavar="machine", dest="machines", nargs="+",
60 help="List of machines to which file must be copied")
61
62 args = parser.parse_args()
63 args.file = os.path.realpath(args.file)
64 main(args)

Here the same assumptions as in the sequential case are made.

In this solution the work that was done inside the for loop is now implemented in the run method of a class that is inherited from threading.Thread class, which is a class that provides an easy way to create working threads such as the ones in the example.

In this case, the output of the command, using the same arguments as in the previous example, is:

$ ./copy_parallel.py STAF325-src.tar.gz 192.168.1.{1,2}
MainThread: md5sum STAF325-src.tar.gz
Thread-1: scp STAF325-src.tar.gz 192.168.1.1:/tmp/
Thread-2: scp STAF325-src.tar.gz 192.168.1.2:/tmp/
Thread-2: ssh 192.168.1.2 md5sum /tmp/STAF325-src.tar.gz
Thread-1: ssh 192.168.1.1 md5sum /tmp/STAF325-src.tar.gz

As it can be seen in the logs, md5sum command execution isn’t necessarily executed in the same order as threads were created. This solution isn’t much more complex than the sequential one, but it finishes earlier. Hence, in the case in which a CPU intensive task must be performed in every machine, the parallel solution will be more convenient since the small increment in coding complex will pay off in execution performance.

The Python+STAF solution

Sequential

The solutions to the problem presented in the previous section are perfectly fine. However, some developers may find it cumbersome to write scripts from scratch using Popen class and desire to work with a platform with feature such as launching process on remote machines already implemented.

That’s were STAF (Software Testing Automation Framework) might be helpful. STAF is a framework that provides the ability to automate jobs specially, but not uniquely, for testing environments. STAF is implemented as a process which runs on every machine that provides services that may be used by clients to accomplish different tasks. For more information regarding STAF, please refer to the project homepage.

The Python+STAF sequential version of the program that has been used as example throughout this article is below:

     1	#!/usr/bin/python
2 """
3 Copy a given file to a list of destination machines sequentially
4 """
5
6 import os, argparse
7 import subprocess
8 import logging
9 import PySTAF
10
11 def main(args):
12 logging.basicConfig(level=logging.INFO, format="%(message)s")
13 handle = PySTAF.STAFHandle(__file__)
14
15 # Calculate md5 sum before copyin the file
16 orig_md5 = run_process_command(handle, "local", "md5sum %s" % args.file).split()[0]
17
18 # Copy the file to every requested machine and verify
19 # that md5 sum of the destination file is equal
20 # to the md5 sum of the original file
21 for machine in args.machines:
22 copy_file(handle, args.file, machine)
23 dest_md5 = run_process_command(handle, machine, "md5sum /tmp/%s"
24 % os.path.basename(args.file)).split()[0]
25 assert orig_md5 == dest_md5
26
27 handle.unregister()
28
29 def run_process_command(handle, location, command_str):
30 """
31 Run a given command and another process and return stdout
32 """
33 logging.info(command_str)
34
35 result = handle.submit(location, "PROCESS", "START SHELL COMMAND %s WAIT RETURNSTDOUT"
36 % PySTAF.STAFWrapData(command_str))
37 assert result.rc == PySTAF.STAFResult.Ok
38
39 mc = PySTAF.unmarshall(result.result)
40 return mc.getRootObject()['fileList'][0]['data']
41
42 def copy_file(handle, filename, destination):
43 """
44 Run a given command and another process and return stdout
45 """
46 logging.info("copying %s to %s" % (filename, destination))
47
48 result = handle.submit("local", "FS", "COPY FILE %s TODIRECTORY /tmp TOMACHINE %s"
49 % (PySTAF.STAFWrapData(filename),
50 PySTAF.STAFWrapData(destination)))
51 assert result.rc == PySTAF.STAFResult.Ok
52
53 if __name__ == "__main__":
54 parser = argparse.ArgumentParser(description=__doc__)
55 parser.add_argument("file",
56 help="File to copy")
57 parser.add_argument(metavar="machine", dest="machines", nargs="+",
58 help="List of machines to which file must be copied")
59
60 args = parser.parse_args()
61 args.file = os.path.realpath(args.file)
62 main(args)

The code makes use of PySTAF, a python library, which is shipped with the STAF software that provides the ability to interact with the framework as a client. The typical usage of the library may summarized as follows:

  • Register a handle in STAF (line 13): The communication with the server process is managed using handles. A client must have a handle to be able to send requests to local and/or remote machines.
  • Submit requests (lines 35 and 48): Once the handle is available at the client, the client can use it to submit requests to any location and service. The two basic services that are used in this example are PROCESS, which is used to launch processes on a machine the same way ssh was used in the python-only version of the example; and FS, which is used to copy files between different machines as scp was used in the python-only solution.
  • Check result code (lines 37 and 51): After a request has been submitted, result code should be checked to make sure that there wasn’t any communication or syntax problem.
  • Unmarshall results (lines 39-40): When the standard output is captured, it must be unmarshalled before using it in python since responses are encoded in a language independent format.
  • Unregister handle (line 27): When STAF isn’t needed anymore, it’s advisable to unregister the handle to free resources allocated to the client in the server.

Compared with the python-only solution, the advantages of STAF aren’t appreciable at first sight. The handler syntax isn’t easier than creating Popen objects and we have to deal with marshalling when we previously were just parsing text. However, as a framework, if has to be taken into account that it is has a learning curve and has much more functionality to offer than this one that makes it worthwhile. Please bear with me until section 5, in which the STAX solution we’ll be shown, with an example with a completely different approach to the problem.

Using the script in this section, the output would be pretty much the same as the previous sequential example:

$ ./staf_copy.py STAF325-src.tar.gz 192.168.1.{1,2}
md5sum STAF325-src.tar.gz
copying STAF325-src.tar.gz to 192.168.1.1
md5sum /tmp/STAF325-src.tar.gz
copying STAF325-src.tar.gz to 192.168.1.2
md5sum /tmp/STAF325-src.tar.gz

As in the previous section, the sequential solution suffers the same problems when CPU intensive tasks are to be performed. Hence, the same comments apply.

Parallel

When using STAF, the parallel solution requires the same changes that were explained before. That is, create a new class that inherits from threading.Thread and implement the working threads. The code below shows how this might be implemented:

     1	#!/usr/bin/python
2 """
3 Copy a given file to a list of destination machines in parallel
4 """
5
6 import os, argparse
7 import subprocess
8 import logging
9 import threading
10 import PySTAF
11
12 def main(args):
13 logging.basicConfig(level=logging.INFO, format="%(threadName)s %(message)s")
14 handle = PySTAF.STAFHandle(__file__)
15 orig_md5 = run_process_command(handle, "local", "md5sum %s" % args.file).split()[0]
16
17 # Create one thread for machine
18 threads = [ WorkingThread(machine, args.file, orig_md5)
19 for machine in args.machines]
20
21 # Run all threads
22 for thread in threads:
23 thread.start()
24
25 # Wait for all threads to finish
26 for thread in threads:
27 thread.join()
28
29 handle.unregister()
30
31 class WorkingThread(threading.Thread):
32 """
33 Thread that performs the copy operation for one machine
34 """
35 def __init__(self, machine, orig_file, orig_md5):
36 threading.Thread.__init__(self)
37
38 self.machine = machine
39 self.file = orig_file
40 self.orig_md5 = orig_md5
41 self.handle = PySTAF.STAFHandle("%s:%s" % (__file__, self.getName()))
42
43 def run(self):
44 # Copy file to remote machine
45 copy_file(self.handle, self.file, self.machine)
46
47 # Calculate md5 sum of the file copied at the remote machine
48 dest_md5 = run_process_command(self.handle, self.machine, "md5sum /tmp/%s"
49 % os.path.basename(self.file)).split()[0]
50 assert self.orig_md5 == dest_md5
51 self.handle.unregister()
52
53 def run_process_command(handle, location, command_str):
54 """
55 Run a given command and another process and return stdout
56 """
57 logging.info(command_str)
58
59 result = handle.submit(location, "PROCESS", "START SHELL COMMAND %s WAIT RETURNSTDOUT"
60 % PySTAF.STAFWrapData(command_str))
61 assert result.rc == PySTAF.STAFResult.Ok
62
63 mc = PySTAF.unmarshall(result.result)
64 return mc.getRootObject()['fileList'][0]['data']
65
66 def copy_file(handle, filename, destination):
67 """
68 Run a given command and another process and return stdout
69 """
70 logging.info("copying %s to %s" % (filename, destination))
71
72 result = handle.submit("local", "FS", "COPY FILE %s TODIRECTORY /tmp TOMACHINE %s"
73 % (PySTAF.STAFWrapData(filename),
74 PySTAF.STAFWrapData(destination)))
75 assert result.rc == PySTAF.STAFResult.Ok
76
77 if __name__ == "__main__":
78 parser = argparse.ArgumentParser(description=__doc__)
79 parser.add_argument("file",
80 help="File to copy")
81 parser.add_argument(metavar="machine", dest="machines", nargs="+",
82 help="List of machines to which file must be copied")
83
84 args = parser.parse_args()
85 args.file = os.path.realpath(args.file)
86 main(args)

As it happened before, this solution is faster since it takes advantage of having multiple CPUs working on md5sum calculation instead of just one at a time. The output we get invoking the script could be:

$ ./staf_copy_parallel.py STAF325-src.tar.gz 192.168.1.{1,2}
MainThread md5sum STAF325-src.tar.gz
Thread-1 copying STAF325-src.tar.gz to 192.168.1.1
Thread-2 copying STAF325-src.tar.gz to 192.168.1.2
Thread-2 md5sum /tmp/STAF325-src.tar.gz
Thread-1 md5sum /tmp/STAF325-src.tar.gz

This time it can be seen that md5sum calculation mustn’t necessarily start in the same order as file copy operation.

Once again, this solution is slightly more complex, but the gain in performance makes it convenient when dealing with tasks with high computational cost.

 

 

LEAVE A REPLY

Please enter your comment!
Please enter your name here