此示例Python使用Selenium调用Chrome浏览器并通过代理进行自动化测试。
import time
import string
import zipfile
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
targetURL = "http://myip.ipip.net" # 访问的目标站点
proxyHost = "61.171.76.145" # 代理IP地址
proxyPort = "50353" # 代理IP端口号
authKey = "x" # 代理IP的AuthKey
password = "x" # 代理IP的AuthPwd
def create_proxy_auth_extension(proxy_host, proxy_port, proxy_username, proxy_password, scheme='http',
plugin_path=None):
if plugin_path is None:
plugin_path = r'./{}_{}_qgnet_proxyauth_plugin.zip'.format(proxy_username, proxy_password)
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Chrome Proxy",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
background_js = string.Template(
"""
var config = {
mode: "fixed_servers",
rules: {
singleProxy: {
scheme: "${scheme}",
host: "${host}",
port: parseInt(${port})
},
bypassList: ["localhost"]
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
return {
authCredentials: {
username: "${username}",
password: "${password}"
}
};
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
"""
).substitute(
host=proxy_host,
port=proxy_port,
username=proxy_username,
password=proxy_password,
scheme=scheme,
)
with zipfile.ZipFile(plugin_path, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
return plugin_path
if __name__ == '__main__':
# 此处指定您的webdriver路径,版本需要跟您所使用的Chrome版本一致,
# 下载地址https://registry.npmmirror.com/binary.html?path=chromedriver/
driver_location = "./chromedriver/chromedriver_v106_win.exe"
proxy_auth_plugin_path = create_proxy_auth_extension(
proxy_host=proxyHost,
proxy_port=proxyPort,
proxy_username=authKey,
proxy_password=password)
option = webdriver.ChromeOptions()
option.add_argument("--start-maximized") # 窗口最大化运行
option.add_extension(proxy_auth_plugin_path) # 添加proxy插件
# 此处selenium版本为4.8.0
driver = webdriver.Chrome(service=Service(driver_location), options=option)
driver.get(targetURL)
time.sleep(100)
driver.quit()
此示例Python使用Selenium调用Chrome浏览器并通过代理进行自动化测试。
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
targetURL = "http://myip.ipip.net" #访问的目标站点
proxyAddr = "您的代理IP:端口号"
if __name__ == '__main__':
browser_location = r".\Chrome\chrome.exe" #指定浏览器路径位置
driver_location = r".\Chrome\chromedriver.exe" #指定Driver路径位置
option = webdriver.ChromeOptions()
option.binary_location = browser_location #设置浏览器位置
option.add_argument("--start-maximized") #窗口最大化运行
option.add_argument('--proxy-server=%(server)s' % {"server": proxyAddr})
driver = webdriver.Chrome(service=Service(driver_location), options=option)
driver.get(targetURL)
print(driver.page_source)
import base64
import time
import requests
from requests.adapters import HTTPAdapter
auth_key = "请改成您的Key"
password = "请改成您的AuthPwd"
tunnel_server = "http://请改成您的隧道地址" # 如:tunnel3.qg.net:19263
target_url = "https://ip.cn/api/index?ip=&type=0" # 要访问的目标地址
proxy_headers = {}
proxy = {
"http": tunnel_server,
"https": tunnel_server
}
def encode_authorization(key, passwd):
# python 使用 bytes 类型进行 base64 编码
basic_str = bytes("%s:%s" % (key, passwd), "ascii")
# 得到的返回值也是 bytes 类型,所以需要再 decode 为字符串
return "Basic %s" % base64.b64encode(basic_str).decode("utf-8")
def reset_tunnel_proxy_headers():
global proxy_headers
proxy_headers = {
tunnel_server: {
"Proxy-Authorization": encode_authorization(auth_key, password)
}
}
def update_tunnel_proxy_headers(key, val):
global proxy_headers
proxy_headers[tunnel_server][key] = val
def new_session():
adapter = TunnelProxyAdapter()
se = requests.Session()
se.mount('https://', adapter)
se.mount('http://', adapter)
return se
class TunnelProxyAdapter(requests.adapters.HTTPAdapter):
def proxy_headers(self, p):
if p in proxy_headers:
print("session with headers:", proxy_headers[p])
return proxy_headers[p]
else:
return None
def multi_channel_tunnel():
"""
结果类似:
request on multi channel
request id: 1 , channel id: channel-1, code: 200, result: 183.155.88.224
request id: 2 , channel id: channel-2, code: 200, result: 125.112.38.153
request id: 3 , channel id: channel-3, code: 200, result: 183.155.89.125
request id: 4 , channel id: channel-4, code: 200, result: 49.71.121.169
request id: 5 , channel id: channel-5, code: 200, result: 115.210.67.220
request id: 6 , channel id: channel-6, code: 200, result: 36.25.41.178
request id: 7 , channel id: channel-7, code: 200, result: 180.125.162.116
request id: 8 , channel id: channel-8, code: 200, result: 140.250.150.158
request id: 9 , channel id: channel-9, code: 200, result: 121.227.102.227
request id: 10, channel id: channel-10, code: 200, result: 49.88.106.198
request id: 1 , channel id: channel-1, code: 200, result: 183.155.88.224
request id: 2 , channel id: channel-2, code: 200, result: 125.112.38.153
request id: 3 , channel id: channel-3, code: 200, result: 183.155.89.125
request id: 4 , channel id: channel-4, code: 200, result: 49.71.121.169
request id: 5 , channel id: channel-5, code: 200, result: 115.210.67.220
request id: 6 , channel id: channel-6, code: 200, result: 36.25.41.178
request id: 7 , channel id: channel-7, code: 200, result: 180.125.162.116
request id: 8 , channel id: channel-8, code: 200, result: 140.250.150.158
request id: 9 , channel id: channel-9, code: 200, result: 121.227.102.227
request id: 10, channel id: channel-10, code: 200, result: 49.88.106.198
"""
print("request on multi channel")
reset_tunnel_proxy_headers()
for i in range(1, 11):
se = new_session()
chan_id = "channel-%s" % i
update_tunnel_proxy_headers("Proxy-TunnelID", chan_id)
resp = se.get(target_url, proxies=proxy, headers={"Connection": "close"})
print("request id: %-2s, channel id: %s, code: %s, result: %s" % (i, chan_id, resp.status_code, resp.text))
time.sleep(10)
# 因为固定时长为1分钟,所以在1分钟内继续使用已有通道,仍是之前的IP
for i in range(1, 11):
se = new_session()
chan_id = "channel-%s" % i
update_tunnel_proxy_headers("Proxy-TunnelID", chan_id)
resp = se.get(target_url, proxies=proxy, headers={"Connection": "close"})
print("request id: %-2s, channel id: %s, code: %s, result: %s" % (i, chan_id, resp.status_code, resp.text))
if __name__ == "__main__":
multi_channel_tunnel()
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
)
/** 返回内容
第一次循环
当前 IP:121.230.91.188 来自于:中国 江苏 泰州 电信
当前 IP:60.184.205.115 来自于:中国 浙江 丽水 电信
当前 IP:125.112.205.149 来自于:中国 浙江 金华 电信
当前 IP:60.184.108.175 来自于:中国 浙江 丽水 电信
当前 IP:58.214.87.31 来自于:中国 江苏 无锡 电信
当前 IP:183.143.131.24 来自于:中国 浙江 湖州 电信
当前 IP:42.53.99.119 来自于:中国 辽宁 锦州 联通
当前 IP:59.60.142.70 来自于:中国 福建 漳州 电信
当前 IP:114.226.175.159 来自于:中国 江苏 常州 电信
当前 IP:123.162.194.223 来自于:中国 河南 驻马店 电信
max channel reached
第二次循环
当前 IP:114.226.175.159 来自于:中国 江苏 常州 电信
当前 IP:60.184.205.115 来自于:中国 浙江 丽水 电信
当前 IP:121.230.91.188 来自于:中国 江苏 泰州 电信
当前 IP:125.112.205.149 来自于:中国 浙江 金华 电信
当前 IP:123.162.194.223 来自于:中国 河南 驻马店 电信
当前 IP:58.214.87.31 来自于:中国 江苏 无锡 电信
当前 IP:183.143.131.24 来自于:中国 浙江 湖州 电信
当前 IP:60.184.108.175 来自于:中国 浙江 丽水 电信
当前 IP:59.60.142.70 来自于:中国 福建 漳州 电信
当前 IP:42.53.99.119 来自于:中国 辽宁 锦州 联通
第二次循环返回的IP与第一次循环相同,因为goroutine是异步的,所以返回顺序和第一次不一致
myip.ipip.net服务器可能比较容易失败,用户可以自己找一个其他获取客户端IP的服务器来测试
*/
// 固定时长多通道隧道模式
func main() {
authKey := "请改成您的Key" //固定时长1分钟的隧道,通道数10
password := "请改成您的AuthPwd"
proxyServer := "请改成您的隧道地址" //如:tunnel3.qg.net:19263
targetURL := "https://ip.cn/api/index?ip=&type=0"
rawURL := fmt.Sprintf("http://%s:%s@%s", authKey, password, proxyServer)
proxyUrl, _ := url.Parse(rawURL)
wg := sync.WaitGroup{}
wg.Add(11)
// 十个通道分别使用不同的十个IP,第十一次会返回错误,通道数超出
for i := 0; i < 11; i++ {
go func(index int) {
defer wg.Done()
client := http.Client{
Transport: &http.Transport{
ProxyConnectHeader: http.Header{
"Proxy-TunnelID": []string{fmt.Sprintf("channel-%d", index)}, // 指定通道ID
},
Proxy: http.ProxyURL(proxyUrl),
},
}
req, _ := http.NewRequest("GET", targetURL, nil)
rsp, err := client.Do(req)
if err != nil {
fmt.Printf("request failed: %s\n", err)
return
}
defer rsp.Body.Close()
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(body))
}
}(i)
}
wg.Wait()
// 因为固定时长为1分钟,所以在1分钟内继续使用已有通道,仍是之前的IP
time.Sleep(time.Second * 10)
wg.Add(10)
for i := 0; i < 10; i++ {
go func(index int) {
defer wg.Done()
client := http.Client{
Transport: &http.Transport{
ProxyConnectHeader: http.Header{
"Proxy-TunnelID": []string{fmt.Sprintf("channel-%d", index)}, // 指定通道ID
},
Proxy: http.ProxyURL(proxyUrl),
},
}
req, _ := http.NewRequest("GET", targetURL, nil)
rsp, err := client.Do(req)
if err != nil {
fmt.Printf("request failed: %s\n", err)
return
}
defer rsp.Body.Close()
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(body))
}
}(i)
}
wg.Wait()
}
import requests
targetURL = "https://ip.cn/api/index?ip=&type=0" //要访问的目标地址
proxyAddr = "请改成您的隧道地址" //如:tunnel3.qg.net:19263
authKey = "请改成您的Key"
password = "请改成您的AuthPwd"
# 账密模式
proxyUrl = "http://%(user)s:%(password)s@%(server)s" % {
"user": authKey,
"password": password,
"server": proxyAddr,
}
proxies = {
"http": proxyUrl,
"https": proxyUrl,
}
for i in range(10):
resp = requests.get(targetURL, proxies=proxies)
print(resp.text)
import base64
import time
import requests
from requests.adapters import HTTPAdapter
auth_key = "请改成您的Key"
password = "请改成您的AuthPwd"
tunnel_server = "http://请改成您的隧道地址" //如:tunnel3.qg.net:19263
target_url = "https://ip.cn/api/index?ip=&type=0" // 要访问的目标地址
proxy_headers = {}
proxy = {
"http": tunnel_server,
"https": tunnel_server
}
def encode_authorization(key, passwd):
# python 使用 bytes 类型进行 base64 编码
basic_str = bytes("%s:%s" % (key, passwd), "ascii")
# 得到的返回值也是 bytes 类型,所以需要再 decode 为字符串
return "Basic %s" % base64.b64encode(basic_str).decode("utf-8")
def reset_tunnel_proxy_headers():
global proxy_headers
proxy_headers = {
tunnel_server: {
"Proxy-Authorization": encode_authorization(auth_key, password)
}
}
def update_tunnel_proxy_headers(key, val):
global proxy_headers
proxy_headers[tunnel_server][key] = val
def new_session():
adapter = TunnelProxyAdapter()
se = requests.Session()
se.mount('https://', adapter)
se.mount('http://', adapter)
return se
class TunnelProxyAdapter(requests.adapters.HTTPAdapter):
def proxy_headers(self, p):
if p in proxy_headers:
print("session with headers:", proxy_headers[p])
return proxy_headers[p]
else:
return None
def normal_tunnel():
"""
结果类似:
request on normal mode
session with headers: {'Proxy-Authorization': 'Basic xxxx'}
request id: 1, code: 200, result: 140.250.149.229
"""
reset_tunnel_proxy_headers()
print("request on normal mode")
resp = new_session().get(target_url, proxies=proxy)
print("request id: 1, code: %s, result: %s" % (resp.status_code, resp.text))
def mark_tunnel():
"""
结果类似:
request with mark
session with headers: {'Proxy-Authorization': 'Basic xxxx', 'Proxy-TunnelID': 'channel-1', 'Proxy-TTL': 10}
request id: 1 , code: 200, result: 183.166.118.48
request id: 2 , code: 200, result: 183.166.118.48
request id: 3 , code: 200, result: 183.166.118.48
request id: 4 , code: 200, result: 183.166.118.48
request id: 5 , code: 200, result: 183.166.118.48
request id: 6 , code: 200, result: 183.166.118.48
request id: 7 , code: 200, result: 183.142.59.203
request id: 8 , code: 200, result: 183.142.59.203
request id: 9 , code: 200, result: 183.142.59.203
request id: 10, code: 200, result: 123.54.235.89
"""
reset_tunnel_proxy_headers()
update_tunnel_proxy_headers("Proxy-TunnelID", "channel-1")
update_tunnel_proxy_headers("Proxy-TTL", 10)
se = new_session()
print("request with mark")
for i in range(1, 12):
resp = se.get(target_url, proxies=proxy, headers={"Connection": "close"})
print("request id: %-2s, code: %s, result: %s" % (i, resp.status_code, resp.text))
time.sleep(1)
if __name__ == "__main__":
normal_tunnel()
mark_tunnel()
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
)
func main() {
authKey := "请改成您的Key"
password := "请改成您的AuthPwd"
proxyServer := "请改成您的隧道地址" //如:tunnel3.qg.net:19263
targetURL := "https://ip.cn/api/index?ip=&type=0"
rawURL := fmt.Sprintf("http://%s:%s@%s", authKey, password, proxyServer)
proxyUrl, _ := url.Parse(rawURL)
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyUrl),
},
}
req, _ := http.NewRequest("GET", targetURL, nil)
rsp, err := client.Do(req)
if err != nil {
fmt.Printf("request failed: %s\n", err)
return
}
defer rsp.Body.Close()
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(body))
}
}
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"
)
/** 返回内容
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:114.219.115.191 来自于:中国 江苏 苏州 电信
当前 IP:14.118.211.116 来自于:中国 广东 江门 电信
*/
func main() {
authKey := "请改成您的Key"
password := "请改成您的AuthPwd"
proxyServer := "请改成您的隧道的地址" //如:tunnel3.qg.net:19263
targetURL := "https://ip.cn/api/index?ip=&type=0"
rawURL := fmt.Sprintf("http://%s:%s@%s", authKey, password, proxyServer)
proxyUrl, _ := url.Parse(rawURL)
client := http.Client{
Transport: &http.Transport{
ProxyConnectHeader: http.Header{
"Proxy-TunnelID": []string{"channel-1"}, // 在CONNECT连接中新增Proxy-TunnelID打标记
"Proxy-TTL": []string{"10"}, // Proxy-TTL指定该标记IP的存活时间
},
Proxy: http.ProxyURL(proxyUrl),
},
}
// 因为标记的存活时间是10s,所以循环11次中有10次的IP是一样的。
// 第11次因为标记的IP到期了,系统会自动替换IP
wg := sync.WaitGroup{}
wg.Add(11)
for i := 0; i < 11; i++ {
go func() {
defer wg.Done()
req, _ := http.NewRequest("GET", targetURL, nil)
rsp, err := client.Do(req)
if err != nil {
fmt.Printf("request failed: %s\n", err)
return
}
defer rsp.Body.Close()
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(body))
}
}()
time.Sleep(time.Second)
}
wg.Wait()
}
const http = require("http");
const url = require("url");
const targetURL = url.parse("https://ip.cn/api/index?ip=&type=0");
const proxyIp = "您的代理IP";
const proxyPort = 端口号;
const authKey = "请改成您的Key";
const password = "请改成您的AuthPwd";
const base64 = new Buffer.from(authKey + ":" + password).toString("base64");
const options = {
host: proxyIp,
port: proxyPort,
path: targetURL,
method: "GET",
headers: {
"Host": urlParsed.hostname,
"Proxy-Authorization": "Basic " + base64
}
};
http.request(options, function (resp) {
console.log("response status code: " + resp.statusCode);
resp.pipe(process.stdout);
}).on("error", function (err) {
console.log("request failed: " + err);
}).end();
const request = require("request");
const targetUrl = "https://ip.cn/api/index?ip=&type=0";
const proxyIp = "您的代理IP";
const proxyPort = 端口号;
const authKey = "请改成您的Key";
const password = "请改成您的AuthPwd";
const proxyUrl = "http://" + authKey + ":" + password + "@" + proxyIp + ":" + proxyPort;
const req = request.defaults({'proxy': proxyUrl});
const options = {
url: targetUrl,
headers: {}
};
req.get(options, function (err, resp, body) {
if (err) {
return console.log(err);
}
console.log("response status code: " + resp.statusCode);
console.log("response body: " + body);
}).on("error", function (err) {
console.log("request failed: " + err);
});
const request = require("superagent");
require("superagent-proxy")(request);
const targetUrl = "https://ip.cn/api/index?ip=&type=0";
const proxyIp = "您的代理IP";
const proxyPort = 端口号;
const authKey = "请改成您的Key";
const password = "请改成您的AuthPwd";
const proxyUrl = "http://" + authKey + ":" + password + "@" + proxyIp + ":" + proxyPort;
request.get(targetUrl).proxy(proxyUrl).end(function onResponse(err, resp) {
if (err) {
return console.log(err);
}
console.log("response status code: " + resp.statusCode);
console.log("response body: " + resp.text);
});
const axios = require("axios")
const {HttpsProxyAgent} = require("https-proxy-agent")
const targetUrl = "http://ip.cn/api/index?ip=&type=0"
const proxyIp = "您的代理IP"
const proxyPort = 端口号
const authKey = "请改成您的Key"
const password = "请改成您的AuthPwd"
const proxy = new HttpsProxyAgent(`http://${authKey}:${password}@${proxyIp}:${proxyPort}`)
axios.get(targetUrl, {
httpAgent: proxy,
httpsAgent: proxy,
}).then(function (response) {
console.log("response body: " + response.data)
}).catch(function (error) {
console.log("request failed: " + error)
}).finally(function () {
console.log("request finished.")
})
// 如果目标站是HTTPS,则需要使用下面的代码进行代理请求
let httpsProxyAgent = require('https-proxy-agent');
var agent = new httpsProxyAgent(`http://${authKey}:${password}@${proxyIp}:${proxyPort}`);
var config = {
url: "https://ip.cn/api/index?ip=&type=0",
httpsAgent: agent
}
axios.request(config)
.then(function(response) {
console.log("response body: " + response.data)
}).catch(function(error) {
console.log("request failed: " + error)
})
string targetUrl = "https://ip.cn/api/index?ip=&type=0";
string proxyIp = "您的代理IP";
string proxyPort = "端口号";
string authKey = "请改成您的Key";
string password = "请改成您的AuthPwd";
WebProxy proxy = new WebProxy(string.Format("{0}:{1}", proxyIp, proxyPort), true);
proxy.Credentials = new NetworkCredential(authKey, password);
ServicePointManager.Expect100Continue = false;
var request = WebRequest.Create(targetUrl) as HttpWebRequest;
request.AllowAutoRedirect = true;
request.KeepAlive = true;
request.Method = "GET";
request.Proxy = proxy;
request.Timeout = 10000;
request.ServicePoint.ConnectionLimit = 16;
using (var resp = request.GetResponse() as HttpWebResponse)
using (var reader = new StreamReader(resp.GetResponseStream(), Encoding.UTF8)){
string htmlStr = reader.ReadToEnd();
}
#!/bin/bash
targetURL="https://ip.cn/api/index?ip=&type=0"
proxyAddr="您的代理IP:端口号"
authKey="请改成您的Key"
password="请改成您的AuthPwd"
curl -x ${authKey}:${password}@${proxyAddr} ${targetURL} -vvvv
package main
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
)
func main() {
authKey := "请改成您的Key"
password := "请改成您的AuthPwd"
proxyServer := "您的代理IP:端口号"
targetURL := "https://ip.cn/api/index?ip=&type=0"
rawURL := fmt.Sprintf("http://%s:%s@%s", authKey, password, proxyServer)
proxyUrl, err := url.Parse(rawURL)
if err != nil {
panic(err)
}
client := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyUrl),
},
}
req, _ := http.NewRequest("GET", targetURL, nil)
rsp, err := client.Do(req)
if err != nil {
fmt.Printf("request failed: %s\n", err)
return
}
defer rsp.Body.Close()
body, err := ioutil.ReadAll(rsp.Body)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(string(body))
}
}