@ -174,6 +174,10 @@ ssoHelper = SSOHelper()
class Authenticator ( object ) :
""" Base authenticator class for authenticator implementations to subclass. """
# Cached Authenticator subclass instance, resolved via get().
_resolved : Optional [ Authenticator ] = None
_resolved_lock = threading . Lock ( )
def authenticate ( self , conn : HttpConn ) :
""" Adds authentication information to the HttpConn. """
raise NotImplementedError ( )
@ -193,47 +197,54 @@ class Authenticator(object):
environment . """
raise NotImplementedError ( )
@ static method
def get ( ) :
@ class method
def get ( cls ) :
""" Returns: (Authenticator) The identified Authenticator to use.
Probes the local system and its environment and identifies the
Authenticator instance to use .
The resolved Authenticator instance is cached as a class variable .
"""
use_new_auth = newauth . Enabled ( )
# Allow skipping SSOAuthenticator for local testing purposes.
skip_sso = newauth . SkipSSO ( )
authenticators : List [ Type [ Authenticator ] ]
if use_new_auth :
LOGGER . debug ( ' Authenticator.get: using new auth stack. ' )
authenticators = [
SSOAuthenticator ,
LuciContextAuthenticator ,
GceAuthenticator ,
LuciAuthAuthenticator ,
]
if skip_sso :
LOGGER . debug ( ' Authenticator.get: skipping SSOAuthenticator. ' )
authenticators = authenticators [ 1 : ]
else :
authenticators = [
LuciContextAuthenticator ,
GceAuthenticator ,
CookiesAuthenticator ,
]
for candidate in authenticators :
if candidate . is_applicable ( ) :
LOGGER . debug ( ' Authenticator.get: Selected %s . ' ,
candidate . __name__ )
return candidate ( )
auth_names = ' , ' . join ( a . __name__ for a in authenticators )
raise ValueError (
f " Could not find suitable authenticator, tried: [ { auth_names } ]. " )
with cls . _resolved_lock :
if ret := cls . _resolved :
return ret
use_new_auth = newauth . Enabled ( )
# Allow skipping SSOAuthenticator for local testing purposes.
skip_sso = newauth . SkipSSO ( )
if use_new_auth :
LOGGER . debug ( ' Authenticator.get: using new auth stack. ' )
authenticators = [
SSOAuthenticator ,
LuciContextAuthenticator ,
GceAuthenticator ,
LuciAuthAuthenticator ,
]
if skip_sso :
LOGGER . debug ( ' Authenticator.get: skipping SSOAuthenticator. ' )
authenticators = authenticators [ 1 : ]
else :
authenticators = [
LuciContextAuthenticator ,
GceAuthenticator ,
CookiesAuthenticator ,
]
for candidate in authenticators :
if candidate . is_applicable ( ) :
LOGGER . debug ( ' Authenticator.get: Selected %s . ' ,
candidate . __name__ )
ret = candidate ( )
cls . _resolved = ret
return ret
auth_names = ' , ' . join ( a . __name__ for a in authenticators )
raise ValueError (
f " Could not find suitable authenticator, tried: [ { auth_names } ]. "
)
class SSOAuthenticator ( Authenticator ) :