< SQL < XML Dumps

A speedy tour of the low level command classes

So you want to know all about the cruft in commandmanagement.py! Yeah, I didn’t think so. But let’s get started anyways.

Here’s some sample code that runs a pipeline of commands (think: zcat | grep | awk ) and later runs a series of commands (think: grep this | wc -l; grep that | wc -l )

The relevant classes are CommandPipeline () and CommandSeries (), so with that in mind, let’s look at the code.

Sample code

Some of the code to run the command series was taken from ProcessMonitor (), because we pretty much never run a command series directly. Instead they are run by making a list of them, even a list with only one command series in it, and letting CommandsInParallel () handle it.

 

#!/usr/bin/python3
'''
sample methods illustrating the use and innards of some
classes from the command_management module
'''
import fcntl
import select
import sys
import os
from dumps.commandmanagement import CommandPipeline, CommandSeries


def usage():
    '''display a short help message about the use of this script'''
    sys.stderr.write("Usage: sample_commands.py <path-to-python-dumps-repo>\n")
    sys.exit(1)


running one pipeline of commands

 

def run_pipeline(pipeline, quiet=False):
    '''
    run a pipeline of commands that produces output and display
    that output if there is any
    '''
    proc = CommandPipeline(pipeline, quiet=quiet)

    # This method expects output to be small; don't use this for any commands
    # that may produce megabytes of output
    proc.run_pipeline_get_output()

    return_ok = bool(proc.exited_successfully())
    if not return_ok:
            print("Some commands failed:", proc.get_failed_cmds_with_retcode())
    else:
            output = proc.output()
        if output:
            # output returned is in bytes; python supports bytes and unicode strings
            output = output.decode("utf-8").rstrip()

        print("Result:", output)

waiting for command output or completion

 

def setup_polling_for_process(proc):
    '''
    set up a poller for stdout, stderr for the specified process
    '''
    # this code is simplified from that in ProcessMonitor in the command_management module
    poller = select.poll()
    poller.register(proc.stderr, select.POLLIN | select.POLLPRI)
    fderr = proc.stderr.fileno()
    flerr = fcntl.fcntl(fderr, fcntl.F_GETFL)
    fcntl.fcntl(fderr, fcntl.F_SETFL, flerr | os.O_NONBLOCK)
    if proc.stdout:
            poller.register(proc.stdout, select.POLLIN | select.POLLPRI)
            fdout = proc.stdout.fileno()
            flout = fcntl.fcntl(fdout, fcntl.F_GETFL)
            fcntl.fcntl(fdout, fcntl.F_SETFL, flout | os.O_NONBLOCK)
        return poller


def handle_events(poller, waiting, series, proc, quiet):
    '''
    given a list of poll events, read from the appropriate
    file descriptors and return the accumulated stdout and
    error output, if any
    '''
    # this code is simplified from that in ProcessMonitor in the command_management module
    output = ""
    error_out = ""
    command_completed = False
    for (filed, event) in waiting:
            series.in_progress_pipeline().set_poll_state(event)
            # check_poll_ready_for_read checks if the event, which we have
            # stashed in an attribute, has one of the flags
            # select.POLLIN or select.POLLPRI set
            if series.in_progress_pipeline().check_poll_ready_for_read():
              out = os.read(filed, 1024)
              if out:
                if filed == proc.stderr.fileno():
                    error_out = error_out + out.decode("utf-8")
                elif filed == proc.stdout.fileno():
                    output = output + out.decode("utf-8")
              else:
                # possible eof? what would cause this?
                pass
            # check_for_poll_errors checks if the stashed event has one of
            # the flags select.POLLHUP, select.POLLNVAL or select.POLLERR set
            elif series.in_progress_pipeline().check_for_poll_errors():
              poller.unregister(filed)
              # Note: if the fd closed prematurely and the proc then runs for hours to
              # completion, we will get no updates here.
              proc.wait()
              if not quiet:
                print("returned from {pid} with {retcode}".format(
                    pid=proc.pid, retcode=proc.returncode))
              command_completed = True
    return output, error_out, command_completed

​

running a series of command pipelines, getting the output

 

def get_series_output(series, quiet=False):
    '''
    run the pipelines in a command series, capture and display the
    output if any, along with any errors
    '''
    # is there some process running that might produce output from
    # one of the pipelines? remember we only run one pipeline at a
    # time, and the series object knows which pipeline is running at
    # any given time, and which process is at the end of the pipeline
    # to produce output
    while series.process_producing_output():
            proc = series.process_producing_output()

            # we need to be able to check when there is output to stdout
            # or stderr from the process, and capture it. we accumulate
            # all errors into one string and all output into another,
            # making the assumption that there are not megabytes of either.
            poller = setup_polling_for_process(proc)

            command_completed = False

          # this code is simplified from that in ProcessMonitor in the
            # command_management module, and it has been split up into the
            # smaller methods handle_events and setup_polling_for_process.
            output = ""
            error_out = ""
            while not command_completed:
             # time is in milliseconds
              waiting = poller.poll(500)
              if waiting:
                new_out, new_err, command_completed = handle_events(
                        poller, waiting, series, proc, quiet)
                if new_out:
                        output += new_out
                if new_err:
                        error_out += new_err
            if output:
               print("Result:", output)
            if error_out:
               print("Errors:", error_out)

            # run next command in series, if any
            # this checks to be sure that the current pipeline's last command
            # is indeed complete, before starting the next pipeline;
            # it calls check_for_poll_errors() (it expects to see one)
            # and check_poll_ready_for_read() (it expects this to be false)
            series.continue_commands()


def run_series(series, quiet=False, shell=False):
    '''
    run a command series the pipelines of which may or may not
    produce output, and display the output from each, if any
    '''
    procs = CommandSeries(series, quiet, shell)
    procs.start_commands()
    get_series_output(procs, quiet)

    if not procs.exited_successfully():
            print("Some commands failed:", procs.pipelines_with_errors())

putting it all together

We use the repo path and files in the repo to grep for text we know is in there.

{{CodeCommentary|type=code|start=161|content= def do_main():

   '''
   entry point
   '''
   if len(sys.argv) != 2:
       usage()
   repo_path = sys.argv[1]
   print("")
   # This is a command pipeline: two or more commands as lists, the
   # output of each to be piped to the next.
   # Note that the full path of each command (grep, wc) is given.
   pipeline = [["/bin/grep", "command", os.path.join(
         repo_path, "dumps/commandmanagement.py")],
         ["/usr/bin/wc", "-l"]]
   print("Running pipeline with default args:")
   print("----------------------")
   run_pipeline(pipeline)
   print("")
   print("Running pipeline with quiet:")
   print("----------------------")
   run_pipeline(pipeline, quiet=True)
   pipeline_one = [["/bin/grep", "command", os.path.join(
         repo_path, "dumps/commandmanagement.py")],
         ["/usr/bin/wc", "-l"]]
   pipeline_two = [["/bin/grep", "command", os.path.join(
         repo_path, "dumps/commandmanagement.py")],
         ["/usr/bin/wc", "-c"]]
   # This is a command series: two or more pipelines which are run
   # one after the other, waiting for one to complete before the
   # next is started.
   series = [pipeline_one, pipeline_two]
   print("\n=====================\n")
   print("Running series with default args:")
   print("----------------------")
   run_series(series)
   print("")
   print("Running series with quiet:")
   print("----------------------")
   run_series(series, quiet=True)


if __name__ == '__main__':

   do_main()

}}

Sample output

Place the module () in the subdirectory xmldumps-backup of the python dumps repo, and run it with no args to see how you should run it. TL;DR: pass the full path of the dumps repo as the only arg.

 

[ariel@bigtrouble xmldumps-backup]$ python sample_commands.py /home/ariel/dumps/xmldumps-backup

Running pipeline with default args:
----------------------
command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260851) started...
command /usr/bin/wc -l (260852) started...
Result: 131

Running pipeline with quiet:
----------------------
Result: 131

=====================

Running series with default args:
----------------------
command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260855) started...
command /usr/bin/wc -l (260856) started...
returned from 260856 with 0
Result: 131

returned from 260856 with 0
returned from 260856 with 0
command /bin/grep command /home/ariel/dumps/xmldumps-backup/dumps/commandmanagement.py (260857) started...
command /usr/bin/wc -c (260858) started...
returned from 260858 with 0
Result: 6989

returned from 260858 with 0
returned from 260858 with 0

Running series with quiet:
----------------------
Result: 131

Result: 6989
This article is issued from Mediawiki. The text is licensed under Creative Commons - Attribution - Sharealike. Additional terms may apply for the media files.