MOON
Server: Apache
System: Linux server2.shieldcogroup.com 3.10.0-1160.119.1.el7.tuxcare.els12.x86_64 #1 SMP Fri Nov 8 05:49:38 UTC 2024 x86_64
User: jacom (1029)
PHP: 8.1.34
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/libexec/kcare/python/kcarectl/ipv6_support.py
# Copyright (c) Cloud Linux Software, Inc
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT

import json
import numbers
import os
import time

from . import config, constants, http_utils, log_utils, serverid, utils
from .py23 import json_loads_nstr

if False:  # pragma: no cover
    from typing import Optional  # noqa: F401

# from CLN sources:
# code: 0 -> valid license (for key based, or ip based)
# code: 1 -> trial license, unexpired
# code: 2 -> no valid license, but there is expired trial license
# code: 3 -> no valid or trial license

CLN_VALID_LICENSE = 0
CLN_TRIAL_ACTIVE_LICENSE = 1
CLN_NO_LICENSE = 3

CACHE_FILE = os.path.join(constants.PATCH_CACHE, 'ipv6_preference.json')
CACHE_TTL_SECONDS = 24 * 60 * 60


class IPProtoSelector(object):
    def is_ipv6_preferred(self):
        # type: () -> bool
        """
        Choose ipv6 if it is more suitable.
        Checks order:
        - check config values (it is faster) - eportal setup and FORCE_IPVx
        - then check each proto availability using HEAD requests
        - then check if we have server_id, it means we don't expect an ip license
        - and finally we need to check if there is an ip license
        """
        # each case is in a separate block for better coverage check
        if config.FORCE_IPV4:
            log_utils.logdebug('decided to use ipv4 because of config values')
            return False
        elif not config.PATCH_SERVER.endswith('kernelcare.com'):
            # eportal setup, we don't need to change urls
            log_utils.logdebug('decided to use ipv4 because of config values')
            return False
        elif config.FORCE_IPV6:
            log_utils.logdebug('decided to use ipv6 because of config values')
            return True

        # further checks are more expensive, use cached value if it is set
        cached = _read_cache()
        if cached is not None:
            log_utils.logdebug('decided to use {0} from on-disk cache'.format('ipv6' if cached else 'ipv4'))
            return cached

        result = None
        if not self._is_url_reachable(config.PATCH_SERVER_IPV6):
            log_utils.logdebug('decided to use ipv4 because ipv6 is not available')
            result = False
        elif not self._is_url_reachable(config.PATCH_SERVER):
            log_utils.logdebug('decided to use ipv6 because ipv4 is not available')
            result = True
        elif serverid.get_serverid():
            log_utils.logdebug('decided to use ipv4 because server id was found')
            result = False

        if result is not None:
            _write_cache(result)
            return result

        ipv4_license = self._get_cln_license(ipv6=False)
        ipv6_license = self._get_cln_license(ipv6=True)

        if ipv4_license == CLN_VALID_LICENSE:
            log_utils.logdebug('decided to use ipv4 because ipv4 license was found')
            result = False
        elif ipv6_license == CLN_VALID_LICENSE:
            log_utils.logdebug('decided to use ipv6 because ipv6 license was found')
            result = True
        elif ipv4_license == CLN_TRIAL_ACTIVE_LICENSE:
            log_utils.logdebug('decided to use ipv4 because ipv4 trial license was found')
            result = False
        elif ipv6_license == CLN_TRIAL_ACTIVE_LICENSE:
            log_utils.logdebug('decided to use ipv6 because ipv6 trial license was found')
            result = True
        else:
            # we don't have any license yet
            result = False

        _write_cache(result)
        return result

    @staticmethod
    def _is_url_reachable(url):
        # type: (str) -> bool
        request = http_utils.http_request(url, method='HEAD', auth_string=None)  # type: ignore[no-untyped-call]
        try:
            http_utils.urlopen(request, timeout=10, retry_on_500=False, retry_count=2)  # type: ignore[no-untyped-call]
            return True
        except Exception as e:
            log_utils.logdebug('error during HEAD request to {0}: {1}'.format(url, str(e)))
            return False

    @staticmethod
    def _get_cln_license(ipv6):
        # type: (bool) -> int
        base_url = config.REGISTRATION_URL_IPV6 if ipv6 else config.REGISTRATION_URL

        # a comment from auth.py:
        # do not retry in case of 500 from CLN!
        # otherwise, CLN will die in pain because of too many requests
        url = base_url + '/check.plain'
        content = utils.nstr(http_utils.urlopen(url, retry_on_500=False).read())  # type: ignore[no-untyped-call]
        info = utils.data_as_dict(content)
        if not info or not info.get('code'):
            log_utils.kcarelog.error('Unexpected CLN response: {0}'.format(content))
            return CLN_NO_LICENSE

        try:
            return int(info['code'])
        except ValueError:
            return CLN_NO_LICENSE


ip_proto_selector = IPProtoSelector()


def _read_cache():
    # type: () -> Optional[bool]
    content = utils.try_to_read(CACHE_FILE)
    if not content:
        return None

    try:
        data = json_loads_nstr(content)
    except (ValueError, TypeError):
        log_utils.logwarn('ipv6 preference cache: malformed json {0!r}'.format(content), print_msg=False)
        return None

    if not isinstance(data, dict):
        log_utils.logwarn('ipv6 preference cache: unexpected payload {0!r}'.format(data), print_msg=False)
        return None

    prefer_ipv6 = data.get('prefer_ipv6')
    if not isinstance(prefer_ipv6, bool):
        log_utils.logwarn('ipv6 preference cache: unexpected prefer_ipv6 {0!r}'.format(prefer_ipv6), print_msg=False)
        return None

    cached_ts = data.get('ts')
    # numbers.Integral covers int and Python 2 long (the latter matters
    # once a 32-bit Python 2 host crosses the 2038 sys.maxint boundary);
    # exclude bool explicitly since it would silently round-trip as 0/1
    if not isinstance(cached_ts, numbers.Integral) or isinstance(cached_ts, bool):
        log_utils.logwarn('ipv6 preference cache: malformed ts {0!r}'.format(cached_ts), print_msg=False)
        return None

    # negative age means the system clock jumped backwards (NTP correction etc.)
    # since we wrote the cache; treat it as a miss rather than as a fresh entry.
    age = time.time() - int(cached_ts)
    if age < 0 or age > CACHE_TTL_SECONDS:
        log_utils.logdebug('ipv6 preference cache: stale entry (age={0:.0f}s, ttl={1}s)'.format(age, CACHE_TTL_SECONDS))
        return None

    return prefer_ipv6


def _write_cache(result):
    # type: (bool) -> None
    data = {
        'prefer_ipv6': result,
        'ts': int(time.time()),
    }
    try:
        utils.atomic_write(CACHE_FILE, json.dumps(data), ensure_dir=True)
    except (OSError, IOError) as e:
        log_utils.logwarn('failed to write ipv6 preference cache: {0}'.format(e), print_msg=False)


def clear_cache():
    # type: () -> None
    """Drop the on-disk ipv6 preference cache."""
    try:
        os.unlink(CACHE_FILE)
    except OSError:
        pass


def get_patch_server():
    # type: () -> str
    return config.PATCH_SERVER_IPV6 if ip_proto_selector.is_ipv6_preferred() else config.PATCH_SERVER


def get_registration_url():
    # type: () -> str
    return config.REGISTRATION_URL_IPV6 if ip_proto_selector.is_ipv6_preferred() else config.REGISTRATION_URL