1
// This file is part of Substrate.
2

            
3
// Copyright (C) Parity Technologies (UK) Ltd.
4
// SPDX-License-Identifier: Apache-2.0
5

            
6
// Licensed under the Apache License, Version 2.0 (the "License");
7
// you may not use this file except in compliance with the License.
8
// You may obtain a copy of the License at
9
//
10
// 	http://www.apache.org/licenses/LICENSE-2.0
11
//
12
// Unless required by applicable law or agreed to in writing, software
13
// distributed under the License is distributed on an "AS IS" BASIS,
14
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
// See the License for the specific language governing permissions and
16
// limitations under the License.
17

            
18
mod sourced;
19

            
20
use hyper::{http::StatusCode, Request, Response};
21
use prometheus::{core::Collector, Encoder, TextEncoder};
22
use std::net::SocketAddr;
23

            
24
pub use prometheus::{
25
	self,
26
	core::{
27
		AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64, GenericCounter as Counter,
28
		GenericCounterVec as CounterVec, GenericGauge as Gauge, GenericGaugeVec as GaugeVec,
29
	},
30
	exponential_buckets, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, Opts,
31
	Registry,
32
};
33
pub use sourced::{MetricSource, SourcedCounter, SourcedGauge, SourcedMetric};
34

            
35
type Body = http_body_util::Full<hyper::body::Bytes>;
36

            
37
pub fn register<T: Clone + Collector + 'static>(
38
	metric: T,
39
	registry: &Registry,
40
) -> Result<T, PrometheusError> {
41
	registry.register(Box::new(metric.clone()))?;
42
	Ok(metric)
43
}
44

            
45
#[derive(Debug, thiserror::Error)]
46
pub enum Error {
47
	/// Hyper internal error.
48
	#[error(transparent)]
49
	Hyper(#[from] hyper::Error),
50

            
51
	/// Http request error.
52
	#[error(transparent)]
53
	Http(#[from] hyper::http::Error),
54

            
55
	/// i/o error.
56
	#[error(transparent)]
57
	Io(#[from] std::io::Error),
58

            
59
	#[error("Prometheus port {0} already in use.")]
60
	PortInUse(SocketAddr),
61
}
62

            
63
async fn request_metrics(
64
	req: Request<hyper::body::Incoming>,
65
	registry: Registry,
66
) -> Result<Response<Body>, Error> {
67
	if req.uri().path() == "/metrics" {
68
		let metric_families = registry.gather();
69
		let mut buffer = vec![];
70
		let encoder = TextEncoder::new();
71
		encoder.encode(&metric_families, &mut buffer).unwrap();
72

            
73
		Response::builder()
74
			.status(StatusCode::OK)
75
			.header("Content-Type", encoder.format_type())
76
			.body(Body::from(buffer))
77
			.map_err(Error::Http)
78
	} else {
79
		Response::builder()
80
			.status(StatusCode::NOT_FOUND)
81
			.body(Body::from("Not found."))
82
			.map_err(Error::Http)
83
	}
84
}
85

            
86
/// Initializes the metrics context, and starts an HTTP server
87
/// to serve metrics.
88
pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error> {
89
	let listener = tokio::net::TcpListener::bind(&prometheus_addr)
90
		.await
91
		.map_err(|_| Error::PortInUse(prometheus_addr))?;
92

            
93
	init_prometheus_with_listener(listener, registry).await
94
}
95

            
96
/// Init prometheus using the given listener.
97
async fn init_prometheus_with_listener(
98
	listener: tokio::net::TcpListener,
99
	registry: Registry,
100
) -> Result<(), Error> {
101
	log::info!(target: "prometheus", "〽️ Prometheus exporter started at {}", listener.local_addr()?);
102

            
103
	let server = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());
104
	let graceful = hyper_util::server::graceful::GracefulShutdown::new();
105

            
106
	loop {
107
		let io = match listener.accept().await {
108
			Ok((sock, _)) => hyper_util::rt::TokioIo::new(sock),
109
			Err(e) => {
110
				log::debug!(target: "prometheus", "Error accepting connection: {:?}", e);
111
				continue;
112
			},
113
		};
114

            
115
		let registry = registry.clone();
116

            
117
		let conn = server
118
			.serve_connection_with_upgrades(
119
				io,
120
				hyper::service::service_fn(move |req| request_metrics(req, registry.clone())),
121
			)
122
			.into_owned();
123
		let conn = graceful.watch(conn);
124

            
125
		tokio::spawn(async move {
126
			if let Err(err) = conn.await {
127
				log::debug!(target: "prometheus", "connection error: {:?}", err);
128
			}
129
		});
130
	}
131
}
132

            
133
#[cfg(test)]
134
mod tests {
135
	use super::*;
136
	use http_body_util::BodyExt;
137
	use hyper::Uri;
138
	use hyper_util::{client::legacy::Client, rt::TokioExecutor};
139

            
140
	const METRIC_NAME: &str = "test_test_metric_name_test_test";
141

            
142
	#[tokio::test]
143
	async fn prometheus_works() {
144
		let listener =
145
			tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("Creates listener");
146

            
147
		let local_addr = listener.local_addr().expect("Returns the local addr");
148

            
149
		let registry = Registry::default();
150
		register(
151
			prometheus::Counter::new(METRIC_NAME, "yeah").expect("Creates test counter"),
152
			&registry,
153
		)
154
		.expect("Registers the test metric");
155

            
156
		tokio::spawn(init_prometheus_with_listener(listener, registry));
157

            
158
		let client = Client::builder(TokioExecutor::new()).build_http::<Body>();
159

            
160
		let res = client
161
			.get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI"))
162
			.await
163
			.expect("Requests metrics");
164

            
165
		assert!(res.status().is_success());
166

            
167
		let buf = res.into_body().collect().await.expect("Failed to read HTTP body").to_bytes();
168
		let body = String::from_utf8(buf.to_vec()).expect("Converts body to String");
169

            
170
		assert!(body.contains(&format!("{} 0", METRIC_NAME)));
171
	}
172
}