关于我们

质量为本、客户为根、勇于拼搏、务实创新

< 返回

Ansible AdHoc &amp; playbook API + 动态生成Inventory +结果关注

发布时间:2022-09-11 23:37:37

为以后用的方便,记录一下(主要是怕忘,又得折腾半天)

直接贴代码,没太多注释,看不懂的看下源码。Pycharm+b

Ansible 2.0 之后的 API 比 2.0 之前要复杂,但使用起来的自由度更好,可根据自己需求修改 Ansible API 的使用方法;还有功能也更强大。

我主要是使用这个 API 配合 Djcelery 实现监控系统的数据采集功能,好处是不再需要每中服务器再开发一个agent。这样使用的问题可能主要是程序的负载性能及程序并发性能,我还没测试。

# -*- coding:utf8 -*-

import os
import sys
import logging
logger = logging.getLogger('django')

# from collections import namedtuple
from ansible.inventory import Inventory
from ansible.vars import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.inventory.group import Group
from ansible.inventory.host import Host
from ansible.playbook.play import Play
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase

# 初始化选项参数
#host_list=C.DEFAULT_HOST_LIST, # ex: /etc/ansible/hosts, legacy usage #执行host清单,如果不指定文件,会读取ansible.cfg文件的inventory = xx字段。
#module_path=None, # ex: /usr/share/ansible#模块路径,比如:/usr/share/ansible ex==example。
#module_name=C.DEFAULT_MODULE_NAME, # ex: copy#模块的名字。
#module_args=C.DEFAULT_MODULE_ARGS, # ex: "src=/tmp/a dest=/tmp/b"#模块的参数。
#forks=C.DEFAULT_FORKS, # parallelism level并发进程数。
#timeout=C.DEFAULT_TIMEOUT, # SSH timeout#ssh 连接超时。
#pattern=C.DEFAULT_PATTERN, # which hosts? ex: 'all', 'acme.example.org'#host清单里,匹配的组 host清单。
#remote_user=C.DEFAULT_REMOTE_USER, # ex: 'username'#远程登录用户和执行用户。
#remote_pass=C.DEFAULT_REMOTE_PASS, # ex: 'password123' or None if using key#远程登录用户密码。
#remote_port=None, # if SSH on different ports#ssh连接端口。
#private_key_file=C.DEFAULT_PRIVATE_KEY_FILE, # if not using keys/passwords#私钥位置,如果不是ssh 连接话。
#background=0, # async poll every X seconds, else 0 for non-async#异步参数。
#basedir=None, # directory of playbook, if applicable#playbook 路径。
#setup_cache=None, # used to share fact data w/ other tasks#搜集远程节点的信息。
#vars_cache=None, # used to store variables about hosts#host清单变量。
#transport=C.DEFAULT_TRANSPORT, # 'ssh', 'paramiko', 'local'#连接远程主机的方式,是ssh还是paramiko....
#conditional='True', # run only if this fact expression evals to true #未知
#callbacks=None, # used for output#用于结果输出。
#module_vars=None, # a playbooks internals thing##模块变量。
#play_vars=None, #
#play_file_vars=None, #
#role_vars=None, #
#role_params=None, #
#default_vars=None, #
#extra_vars=None, # extra vars specified with he playbook(s)#其他额外一些参数。
#is_playbook=False, # running from playbook or not?
#inventory=None, # reference to Inventory object
#subset=None, # subset pattern
#check=False, # don't make any changes, just try to probe for potential changes#测试play,看看对hostlist产生哪些变化。
#diff=False, # whether to show diffs for template files that change
#environment=None, # environment variables (as dict) to use inside the command
#complex_args=None, # structured data in addition to module_args, must be a dict
#error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR, # ex. False
#accelerate=False, # use accelerated connection
#accelerate_ipv6=False, # accelerated connection w/ IPv6
#accelerate_port=None, # port to use with accelerated connection
#vault_pass=None,
#run_hosts=None, # an optional list of pre-calculated hosts to run on
#no_log=False, # option to enable/disable logging for a given task
#run_once=False, # option to enable/disable host bypass loop for a given task
#become=False, # whether to run privelege escalation or not
#become_method=C.DEFAULT_BECOME_METHOD,
#become_user=C.DEFAULT_BECOME_USER, # ex: 'root'
#become_pass=C.DEFAULT_BECOME_PASS, # ex: 'password123' or None
#become_exe=C.DEFAULT_BECOME_EXE, # ex: /usr/local/bin/sudo

# OPTIONS = namedtuple('OPTIONS',
#                        ['module_path', 'extra_vars', 'forks', 'become', 'become_method', 'become_user',
#                         'become_ask_pass', 'connection', 'timeout', 'poll_interval', 'check', 'diff']
#                    )

# options = OPTIONS(module_path=None, extra_vars=None, forks=5, become=None, become_method=None, become_user=None,
#                   become_ask_pass=None, connection='smart', timeout=10, poll_interval=15, check=False, diff=3)

class Options(object):

'''
Initialize options class to replace Ansible OptParser
'''

def __init__(self, module_path=None, extra_vars=None, forks=10, become=None, become_method=None, become_user=None,
             become_ask_pass=None, connection='smart', timeout=2, poll_interval=15, check=False, diff=3,
             listtasks=None, listhosts=None, listtags=None, syntax=None):
    self.module_path = module_path
    self.extra_vars = extra_vars
    self.forks = forks
    self.become = become
    self.become_method = become_method
    self.become_user = become_user
    self.become_ask_pass = become_ask_pass
    self.connection = connection
    self.timeout = timeout
    self.poll_interval = poll_interval
    self.check = check
    self.diff = diff
    self.listhosts = listhosts
    self.listtasks = listtasks
    self.listtags = listtags
    self.syntax = syntax

class ResultsCallback(CallbackBase):

'''
Callback the result of execute AdHoc and playbook
'''

def __init__(self, *args, **kwargs):
    super(ResultsCallback, self).__init__(*args, **kwargs)
    self.host_ok = {}
    self.host_unreachable = {}
    self.host_failed = {}

def v2_runner_on_unreachable(self, result):
    self.host_unreachable[result._host.get_name()] = result

def v2_runner_on_ok(self, result,  *args, **kwargs):
    self.host_ok[result._host.get_name()] = result

def v2_runner_on_failed(self, result,  *args, **kwargs):
    self.host_failed[result._host.get_name()] = result

def v2_runner_on_async_poll(self, result):
    self.host_ok.setdefault('async_poll', result)

def v2_runner_on_async_ok(self, result):
    self.host_ok.setdefault('async_ok', result)

def v2_runner_on_async_failed(self, result):
    self.host_failed.setdefault('async_failed', result)

def v2_playbook_on_no_hosts_matched(self):
    self.host_unreachable.setdefault(self)

class DynamicInventory(Inventory):

def __init__(self, resource, loader, variable_manager):

    '''
    @resource:
    { 
        "group1": { 
            "hosts": [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...], 
            "vars": {"var1": value1, "var2": value2, ...} 
        } 
    }
    '''

    self.resource = resource
    self.inventory = Inventory(loader=loader, variable_manager=variable_manager, host_list=[])
    self.gen_inventory()

def Dynamic_add_group(self, hosts, groupname, groupvars=None):

        '''
    Dynamic generate group list of ansible inventory
    :param hosts:
    :param groupname:
    :param groupvars:
    :return:
    '''

    NewGroup = Group(name=groupname)

    if groupvars:
        for key, value in groupvars.iteritems():
            NewGroup.set_variable(key, value)

    for host in hosts:
        hostname = host.get("hostname")
        hostport = host.get("port")
        username = host.get("username")
        password = host.get("password")
        # ssh_key = host.get("ssh_key")
        GeneralHost = Host(name=hostname, port=hostport)
        GeneralHost.set_variable('ansible_ssh_host', hostname)
        GeneralHost.set_variable('ansible_ssh_port', hostport)
        GeneralHost.set_variable('ansible_ssh_user', username)
        GeneralHost.set_variable('ansible_ssh_pass', password)
        # GeneralHost.set_variable('ansible_ssh_private_key_file', ssh_key)

        for key, value in host.iteritems():
            if key not in ["hostname", "port", "username", "password"]:
                GeneralHost.set_variable(key, value)
        NewGroup.add_host(GeneralHost)

    return self.inventory.add_group(NewGroup)

def gen_inventory(self):

    '''
    Dynamic generate host inventory of ansible
    :return:
    '''

    if isinstance(self.resource, list):
        self.Dynamic_add_group(self.resource, 'default_group')
    elif isinstance(self.resource, dict):
        for groupname, hosts_and_vars in self.resource.iteritems():
            self.Dynamic_add_group(hosts_and_vars.get("hosts"), groupname, hosts_and_vars.get("vars"))

class AnsibleAPI(object):

def __init__(self, resource):

    '''
    @resource type: dict
    { 
        "group1": { 
            "hosts": [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}, ...], 
            "vars": {"var1": value1, "var2": value2, ...} 
        } 
    }
    @resource: list
    { 
        "hosts": [{"hostname": "10.0.0.0", "port": "22", "username": "test", "password": "pass"}
    }
    '''

    self.variable_manager = VariableManager()
    self.loader = DataLoader()
    self.options = Options()
    self.passwords = dict()
    self.results_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
    self.resource = resource
    self.callback = ResultsCallback()
    self.inventory = DynamicInventory(self.resource, self.loader, self.variable_manager).inventory
    self.variable_manager.set_inventory(self.inventory)

    if isinstance(self.resource, list):
        self.host_list = map(str, self.inventory.get_hosts(pattern='default_group'))

    if isinstance(self.resource, dict):
        self.host_list = []
        for groupname in self.resource.keys():
            self.host_list.extend(self.inventory.get_group(groupname).get_hosts())
        self.host_list = map(str, self.host_list)

def PrivateAdHoc(self, module_name, module_args=''):

    '''
    Use ansible ad-hoc to run ansible mudule
    :param module_name: Executable ansible mudule
    :param module_args: Executable mudule extends arguments
    :return: Execution status number
    '''

    play_source = dict(
        name="Ansible Play",
        hosts=self.host_list,
        gather_facts='no',
        tasks=[dict(action=dict(module=module_name, args=module_args))]
    )
    play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

    tqm = None
    try:
        tqm = TaskQueueManager(
            inventory=self.inventory,
            variable_manager=self.variable_manager,
            loader=self.loader,
            options=self.options,
            passwords=self.passwords,
            stdout_callback=self.callback
        )
        result = tqm.run(play)
    finally:
        if tqm is not None:
            tqm.cleanup()

    return result

def PrivatePlaybook(self, playbooks):

    '''
    Run ansible palybook and get ran result
    :param playbooks: Absolute path of entry file of ansible playbook
    :return: Execution status number
    '''

    try:
        if not os.path.exists(playbooks):
            logger.error('No such file: %s' % playbooks)
            sys.exit()
        self.executor = PlaybookExecutor(playbooks=[playbooks, ],
                                         inventory=self.inventory,
                                         variable_manager=self.variable_manager,
                                         loader=self.loader,
                                         options=self.options,
                                         passwords=self.passwords
                                         )
        self.executor._tqm._stdout_callback = self.callback
        result = self.executor.run()
    except Exception as e:
        sys.exit(127)
    else:
        return result

def playbook_result(self):

    for host, result_object in self.executor._tqm._stdout_callback.host_ok.iteritems():
        self.results_raw['success'][host] = result_object._result

    for host, result_object in self.executor._tqm._stdout_callback.host_failed.iteritems():
        try:
            self.results_raw['failed'][host] = result_object._result['stderr']
        except KeyError:
            self.results_raw['failed'][host] = result_object._result['msg']

    for host, result_object in self.executor._tqm._stdout_callback.host_unreachable.iteritems():
        self.results_raw['unreachable'][host] = result_object._result['msg']

    return self.results_raw

def adhoc_result(self):

    '''
    Takes ansible ad-hoc execution result
    :return: Dict
    '''

    for host, result in self.callback.host_ok.items():
        self.results_raw['success'][host] = result._result

    for host, result in self.callback.host_failed.items():
        try:
            self.results_raw['failed'][host] = result._result['stderr']
        except KeyError:
            self.results_raw['failed'][host] = result._result['msg']

    for host, result in self.callback.host_unreachable.items():
        self.results_raw['unreachable'][host] = result._result['msg']

    return self.results_raw

if __name__ == '__main__':

#Ad-Hoc testing
# ad_hoc_example = AnsibleAPI([
# dict(hostname='10.10.181.132', port=20003, username='root', password=123456),
# ])
# print ad_hoc_example.PrivateAdHoc('service', module_args='name=crond state=reloaded')
# print ad_hoc_example.get_result()

# Playbook testing
playbook_example = AnsibleAPI(dict(memory=dict(hosts=[dict(hostname='10.10.181.132', port=20003, username='root', password=123456),
                                             dict(hostname='10.10.181.131', port=20004, username='root', password=123456)], vars=dict())))
print playbook_example.PrivatePlaybook('/Users/fanolee/PycharmProjects/AutoOPPlatform/monitor/ansible/playbooks/entry_files/memory/memory.yml')
print playbook_example.playbook_result()

另外有需要云服务器可以了解下风纳云fengnayun.com,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


/template/Home/Dawn/PC/Static

选择风纳云,也许是您成就一番大事业的开端

注册账号