共享不同域名的登录凭据的程序编不出来

之前的 开发任务列表 第一条:

程序编不出来,好头大啊。 :exploding_head:

下面是半成品,总体思路是通过共享 cookies 实现不同域名的登录状态同步。

在本站页面嵌入 JS,如果用户已经登录,则在网页中嵌入 N 个不同域名(xjtu.app, xjtu.app, …)的形如 https://xjtu.app/share_sessions/universal.gif?username=admin&key=54904749a6af1f67efab7b257eb 的不可见图片。这一方法是受 Stack Exchange 启发:

使用 caddy分流/share_sessions/universal.gif 路径下的流量到 Python 写的脚本。

逻辑:读取 request 的 key(之前的 js 里生成的),如果是已经登录的域名(为了防止伪造 cookies 攻击,使用插件将 HMAC 过的用户登录状态保存在 logged_in cookie 里,见附件),则将 cookies 等信息存到 redis 里。其他域名此时会有 404,当用户访问其他页面再次请求时,设置 Access-Control-Allow-OriginAccess-Control-Allow-Credentials,并将已登录域名的 cookie 写到未登录的域名的储存空间,理论上此时工作就完成了。

然而现在的表现是

  1. 必须要在新标签页打开 universal.gif
  2. 可能会在一段时间后手动登录的域名的登录状态会变成退出

附件:

js in head.html
<!DOCTYPE html>
<script type="text/discourse-plugin" version="0.8">
    var div = document.createElement('div');
    div.className = 'universal_auth_div';
    div.id = 'op';
    div.style.cssText = 'position: absolute; z-index: 999; height: 16px; width: 16px; top:70px; display:none';
    // dec2hex :: Integer -> String
    // i.e. 0-255 -> '00'-'ff'
    function dec2hex (dec) {
      return dec.toString(16).padStart(2, "0")
    }

    // generateId :: Integer -> String
    function generateId (len) {
      var arr = new Uint8Array((len || 40) / 2)
      window.crypto.getRandomValues(arr)
      return Array.from(arr, dec2hex).join('')
    }

    function getCookie(cname) {
      let name = cname + "=";
      let decodedCookie = decodeURIComponent(document.cookie);
      let ca = decodedCookie.split(';');
      for(let i = 0; i <ca.length; i++) {
        let c = ca[i];
        while (c.charAt(0) == ' ') {
          c = c.substring(1);
        }
        if (c.indexOf(name) == 0) {
          return c.substring(name.length, c.length);
        }
      }
      return "";
    }

    const user = api.getCurrentUser();
    console.log(user);
    if(user) {
        var myStringArray = ["xjtu.app","xjtu.app","xjtu.app"];
        var arrayLength = myStringArray.length;
        var imgHtml = "";
        let cookie_key = getCookie("universal_auth");
        if (cookie_key == "") {
            cookie_key = generateId(128);
            document.cookie = "universal_auth=" + cookie_key;
        }
        for (var i = 0; i < arrayLength; i++) {
            imgHtml += "<img src=\"https://" + myStringArray[i] + "/share_sessions/universal.gif?username=" +
            user.username + "&key=" + cookie_key + "\"\n" +
                "         style=\"display: none\"\n" +
                "         crossorigin=\"use-credentials\"/>"
        }

        div.innerHTML = '<span>' + imgHtml + '</span>';

        document.body.appendChild(div);
    }

</script>
main.py
import base64
import datetime
import hashlib
import hmac
import json
import logging
import os
import pickle
from urllib import parse
from flask import Flask, request, abort, make_response, send_file
from redis import Redis
from config import TRUSTED_HOSTNAMES, name_of, AttrDict, Status

r = Redis()

app = Flask(__name__)

DIS_SHARE_SESSION_KEY = os.environ['DIS_SHARE_SESSION_KEY']


def rget(name, key):
    return r.get(name_of(name, key))


def rset(name, key, value):
    r.set(name_of(name, key), value)


def get_domain_policy(host):
    return host if host == 'xjtu.app' else None


@app.route('/share_sessions/universal.gif', methods=['GET', 'OPTIONS'])
def universal_auth():
    cookies = request.cookies
    host = request.host

    if request.method == 'OPTIONS':
        print(f'options {host}')
    else:
        print(f'get {host}')
    js_key = request.args.get('key', None)
    js_username = request.args.get('username', None)
    if js_key is None:
        return abort(404)

    if request.host not in TRUSTED_HOSTNAMES:
        return abort(404)

    status = rget(js_key, 'status')
    resp = make_response(send_file('static/1x1.gif', mimetype='image/gif'))

    if status is None:
        logged_in = cookies.get('logged_in')
        if logged_in is None:
            return abort(404)
        try:
            cookie_logged_in = parse.unquote(logged_in)
            cookie_logged_in = base64.b64decode(cookie_logged_in)
            cookie_logged_in = json.loads(cookie_logged_in)
            _json = cookie_logged_in['json']
            calculated_hmac = hmac.new(bytes(DIS_SHARE_SESSION_KEY, 'utf-8'),
                                       bytes(_json, 'utf-8'),
                                       hashlib.sha256).hexdigest()
        except Exception as e:
            logging.error(e)
            return abort(404)

        if calculated_hmac != cookie_logged_in['hmac']:
            return abort(404)

        user_info = json.loads(_json)
        del cookie_logged_in
        user_info = AttrDict(user_info)
        username = user_info.username

        if username != js_username:
            return abort(404)

        other_domains = TRUSTED_HOSTNAMES.copy()
        other_domains.remove(host)

        origin = f'https://{host}'

        rset(js_key, 'status', Status.PENDING.value)
        rset(js_key, 'cookies', pickle.dumps(cookies))
        rset(js_key, 'origin', origin)
        rset(js_key, 'other_domains', '|'.join(other_domains))

        return resp

    else:
        status = rget(js_key, 'status')

        # TODO: handle partial finished (user log in to other domain manually)
        if Status(int(status.decode())) == Status.FINISHED.value:
            return abort(404)

        origin = rget(js_key, 'origin')
        origin = origin.decode()
        cookies = rget(js_key, 'cookies')
        cookies = pickle.loads(cookies)
        other_domains = rget(js_key, 'other_domains')
        other_domains = other_domains.decode()
        other_domains = set(other_domains.split('|'))
        if host not in other_domains and request.method == 'GET':
            return abort(404)

        resp.headers.add('Access-Control-Allow-Origin', origin)
        resp.headers.add('Access-Control-Allow-Credentials', 'true')

        if request.method == 'GET':
            req_cookies = request.cookies
            flag_should_remove_host = []
            for k, v in cookies.items():
                print(f'{host}: set {k} to {v}')
                if k == '_t':
                    expire_date = datetime.datetime.now() + datetime.timedelta(days=60)  # 1440 hours
                elif k == '_forum_session':
                    expire_date = None
                else:
                    expire_date = None
                resp.set_cookie(k, v, domain=get_domain_policy(host), httponly=True, secure=True, samesite='None',
                                expires=expire_date)
                if req_cookies.get(k) == v:
                    flag_should_remove_host.append(True)
            if len(flag_should_remove_host) > 0 and all(flag_should_remove_host):
                logging.warning(f'{host}: removed')
                other_domains.remove(host)
            rset(js_key, 'other_domains', '|'.join(other_domains))
            if len(other_domains) == 0:
                rset(js_key, 'status', Status.FINISHED.value)

        print(f'host {host} finished')
        return resp
config.py
from enum import Enum, auto

TRUSTED_HOSTNAMES = '''
xjtu.app
xjtu.app
xjtu.app
# cf.xjtu.app
# ipv4.xjtu.app
# ipv6.xjtu.app
# us.xjtu.app
# jp.xjtu.app
# hk.xjtu.app
# direct.xjtu.app
'''
TRUSTED_HOSTNAMES = [i.strip() for i in TRUSTED_HOSTNAMES.strip().splitlines()
                     if len(i.strip()) != 0 and not i.strip().startswith('#')]
TRUSTED_HOSTNAMES = set(TRUSTED_HOSTNAMES)

name_of = lambda name, key: f'xjtumen-share-session-{name}-{key}'


class AttrDict(dict):
    __setattr__ = dict.__setitem__
    __getattr__ = dict.__getitem__

class Status(int, Enum):
    PENDING = auto()
    FINISHED = auto()

plugin.rb
class ExCurrentUserProvider < Auth::DefaultCurrentUserProvider
  TOKEN_COOKIX ||= "logged_in".freeze
  TOKEN_COOKIX2 ||= "universal_auth".freeze

  def log_on_user(user, session, cookies, opts = {})
    super

    require 'openssl' if !defined?(OpenSSL)
    require 'base64' if !defined?(Base64)

    payload = { username: user.username, user_id: user.id, avatar: user.avatar_template, group: user.title }
    hash_function = OpenSSL::Digest.new('sha256')
    payload_json = payload.to_json
    hmac = OpenSSL::HMAC.hexdigest(hash_function, SiteSetting.cookie_ui_key, payload_json)
    payload_sha = Digest::SHA256.hexdigest SiteSetting.cookie_ui_key
    payload2 = {json: payload_json, hmac: hmac, payload_sha: payload_sha}
    token = Base64.strict_encode64(payload2.to_json)
    cookies.permanent[TOKEN_COOKIX] = { value: token, httponly: true, secure: true, domain: :all, same_site: 'None' }
  end

  def log_off_user(session, cookies)
    super

    cookies[TOKEN_COOKIX] = { value: '', httponly: true, secure: true, domain: :all }
    cookies[TOKEN_COOKIX2] = { value: '', secure: true, domain: :all }
  end

end

3 Likes

感觉应该开发个插件,调原生 log_on_user API

SSO 你值得拥有

匿名回复和问答模式插件冲突