1use std::path::{Path, PathBuf};
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use alloy_node_bindings::{Anvil, AnvilInstance};
7use barstrainer_client::SigningIdentity;
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use serde_json::{Map, Value};
10use tokio::signal::unix::{SignalKind, signal};
11use tokio_util::sync::CancellationToken;
12
13use lyquor_api::{
14 anyhow::{self, Context as _Context},
15 config::{DB, NodeConfig},
16 kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
17 profile::{LyquorProfile, NetworkType},
18 subkey_builder,
19};
20use lyquor_db::{MemDB, RocksDB};
21use lyquor_image_store::{DirectoryOciStore, DirectoryRepo, LyquidImageRepo, OciDistributionStore};
22use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
23use lyquor_jsonrpc::types::{EthGetChainId, EthGetChainIdResp};
24use lyquor_lib::db::{DBFlusher, DBFlusherHandle};
25use lyquor_lib::dns_updater::publish_external_host_dns;
26use lyquor_lib::image_resolver::ImageResolver;
27use lyquor_lib::lyquid_log::LyquidLog;
28use lyquor_lib::side_effects::{SideEffectRuntime, build_side_effect_runtime};
29use lyquor_lib::{http, lyquid, utils};
30use lyquor_primitives::{Address, Bytes, LyteLog};
31use lyquor_seq::{SequenceBackend, eth, fco::FCO};
32
33subkey_builder!(RootSubKey(
34 ([0x01])-lyquid() => Key,
35 ([0x02])-fco() => Key,
36 ([0x03])-log() => Key,
37 ([0x04])-side_effect_store() => Key,
38));
39
40fn subkey() -> &'static RootSubKey {
41 static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
42 KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
43}
44
45const ANVIL_STATE_FILENAME: &str = "anvil-state.json";
46
47static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
48
49async fn setup_eth_backend(
50 profile: &LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
51 shutdown: CancellationToken,
52) -> anyhow::Result<eth::Backend> {
53 let finality_tag: lyquor_jsonrpc::types::BlockNumber =
54 serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
55
56 Ok(eth::Backend::new(
57 eth::Config::builder()
58 .finality_tag(finality_tag)
59 .registry(reg)
60 .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
61 .build(),
62 jsonrpc_client,
63 shutdown,
64 ))
65}
66
67#[derive(Default)]
68struct Subsystems {
69 eth_backend: Option<lyquor_seq::eth::Backend>,
70 fco: Option<FCO>,
71 log: Option<LyquidLog>,
72 pool: Option<lyquid::LyquidPool>,
73 api: Option<http::HttpServer>,
74 db: Option<DBFlusherHandle>,
75 discovery: Option<lyquor_lib::discovery::LyquidDiscovery>,
76 client: Option<ClientHandle>,
77}
78debug_struct_name!(Subsystems);
79
80impl Subsystems {
81 #[tracing::instrument(level = "trace")]
82 async fn stop(&mut self, shutdown: CancellationToken) {
83 if let Some(api) = self.api.take() {
84 api.stop();
85 }
86 if let Some(eth_backend) = self.eth_backend.take() {
87 eth_backend.shutdown().await;
88 }
89 if let Some(discovery) = self.discovery.take() {
90 discovery.shutdown().await;
91 }
92 if let Some(pool) = self.pool.take() {
93 pool.shutdown().await;
94 }
95 if let Some(db) = self.db.take() {
96 db.stop().await;
97 }
98 shutdown.cancel();
99 if let Some(client) = self.client.take() {
100 client.wait_for_shutdown().await;
101 }
102 if let Some(fco) = self.fco.take() {
103 fco.shutdown().await;
104 }
105 if let Some(log) = self.log.take() {
106 log.shutdown().await;
107 }
108 }
109}
110
111struct Node {
112 sys: Subsystems,
113 config: Arc<NodeConfig>,
114 profile: Arc<LyquorProfile>,
115 seq_eth_url: String,
116 http_tls: Option<http::httptls::HttpTls>,
117 started_anvil: bool,
118 network: Option<Arc<lyquor_net::hub::Hub>>,
119 shutdown: CancellationToken,
120}
121
122lyquor_primitives::debug_struct_name!(Node);
123
124impl Node {
125 #[tracing::instrument(level = "trace", skip_all)]
126 async fn start(mut self) -> anyhow::Result<Self> {
127 self.sys = Self::init(
128 self.config.clone(),
129 self.profile.clone(),
130 self.seq_eth_url.clone(),
131 self.http_tls.clone(),
132 self.started_anvil,
133 self.network.take().expect("Node: Network not found."),
134 self.shutdown.clone(),
135 )
136 .await?;
137 Ok(self)
138 }
139
140 #[tracing::instrument(level = "trace", skip_all)]
141 async fn shutdown(&mut self) {
142 self.sys.stop(self.shutdown.clone()).await;
143 }
144
145 #[tracing::instrument(level = "trace", skip_all)]
146 async fn init(
147 config: Arc<NodeConfig>, profile: Arc<LyquorProfile>, seq_eth_url: String,
148 http_tls: Option<http::httptls::HttpTls>, started_anvil: bool, network: Arc<lyquor_net::hub::Hub>,
149 shutdown: CancellationToken,
150 ) -> anyhow::Result<Subsystems> {
151 let mut sys = Subsystems::default();
152
153 let jsonrpc_client = ClientConfig::builder()
155 .url(seq_eth_url.parse()?)
156 .build()
157 .into_client(shutdown.child_token());
158 sys.client = Some(jsonrpc_client.clone());
159 let sequence_backend_chain_id: u64 = jsonrpc_client
160 .request::<EthGetChainId, EthGetChainIdResp>(EthGetChainId)
161 .await
162 .context("Failed to query sequence backend chain ID.")?
163 .0
164 .to();
165 let sequence_backend_bartender_addr =
166 Address::from_str(profile.bartender_addr()).context("Invalid bartender address in profile.")?;
167 let resolved_db_dir = config.resolved_db_dir();
168 let resolved_image_repo_dir = config.resolved_image_repo_dir();
169
170 let (store, lyq_store): (Arc<ShadowKVStore<Box<dyn KVStore>>>, Arc<dyn utils::LVMStoreFactory>) =
172 match &config.storage.db {
173 DB::MemDb => {
174 tracing::warn!("using MemDB, all data will be lost upon exit");
175 let store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new()) as Box<dyn KVStore>));
176 (store, Arc::new(utils::LVMMemStore))
177 }
178 DB::RocksDb { .. } => {
179 let base_dir = resolved_db_dir.clone();
180 let store = Arc::new(ShadowKVStore::new(
181 Box::new(RocksDB::new(&base_dir.join(profile.id()))?) as Box<dyn KVStore>,
182 ));
183 let lyq_store = Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
184 store.clone(),
185 subkey().lyquid(),
186 ))));
187
188 if started_anvil {
189 let state_file = anvil_state_path(&base_dir, profile.id());
190 if state_file.exists() {
191 tracing::info!("Using Anvil state file {}.", state_file.display());
192 } else {
193 tracing::info!(
194 "No existing Anvil state file found at {}, starting with fresh state.",
195 state_file.display()
196 );
197 }
198 }
199
200 (store, lyq_store)
201 }
202 };
203 let discovery =
208 lyquor_lib::discovery::LyquidDiscovery::new(*profile.bartender_id(), sequence_backend_bartender_addr);
209 let discovery_handle = discovery.handle();
210 sys.discovery = Some(discovery);
211
212 let eth_backend = setup_eth_backend(
213 &profile,
214 discovery_handle.contract_registry_service(),
215 jsonrpc_client.clone(),
216 shutdown.child_token(),
217 )
218 .await?;
219 sys.eth_backend = Some(eth_backend.clone());
220 let SideEffectRuntime {
221 store: side_effect_store,
222 provider: side_effect_provider,
223 } = build_side_effect_runtime(config.as_ref(), store.clone(), subkey().side_effect_store())?;
224 let archive_inbound_store = (!side_effect_provider.is_selective_hosting()).then(|| side_effect_store.clone());
225
226 let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
227 let fco = FCO::new(
228 eth_backend.clone(),
229 Arc::new(fco_store),
230 Some(profile.init_chain_position()),
231 side_effect_provider,
232 archive_inbound_store,
233 )?;
234 let fco_handle = fco.handle();
235 sys.fco = Some(fco);
236 let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
241 let vm_log_store: Arc<dyn lyquor_state::SimpleStateStore> =
242 Arc::new(lyquor_state::DBSimpleStateStore::new(log_store));
243 let log = LyquidLog::new(vm_log_store.clone());
244 let log_hub = log.handle();
245 sys.log = Some(log);
246
247 let mut lvm = lyquid::LVM::new(
248 vm_log_store,
249 network.clone(),
250 Some(lyquor_vm::Sequencer {
251 inter: lyquor_api::interface::InterCallService::new(fco_handle.clone()),
252 submit: lyquor_api::interface::SubmitService::new(fco_handle.clone()),
253 fetch_oracle_info: lyquor_api::interface::OracleInfoService::new(fco_handle.clone()),
254 }),
255 );
256 for peer in &config.peers {
260 lvm.upc_mut()
261 .add_remote(peer.node_id, Some(peer.upc_addr.clone()))
262 .await
263 .with_context(|| format!("Failed to add UPC peer {}", peer.node_id))?;
264 }
265 let image_repo = Arc::new(
269 DirectoryRepo::new(&resolved_image_repo_dir)
270 .await
271 .context("Failed to initialize image store.")?,
272 );
273 let image_repo_for_resolver: Arc<dyn LyquidImageRepo> = image_repo.clone();
274 let image_repo_for_http: Arc<dyn OciDistributionStore> = Arc::new(DirectoryOciStore::new(image_repo.clone()));
275 let image_resolver = Arc::new(ImageResolver::new(
276 image_repo_for_resolver,
277 config.image.fallback_repos.clone(),
278 ));
279 let bartender_id = *profile.bartender_id();
280 let pool = lyquid::LyquidPool::new(
281 bartender_id,
282 lvm,
283 lyquid::LyquidPoolSetup {
284 fco: fco_handle.clone(),
285 store_factory: lyq_store,
286 image_resolver,
287 side_effect_store,
288 log_hub: log_hub.clone(),
289 sequence_backend_chain_id,
290 sequence_backend_bartender_addr,
291 config: lyquid::LyquidConfig::builder().build(),
292 },
293 )
294 .await?;
295 let pool_handle = pool.handle();
296 sys.pool = Some(pool);
297 let register_topic = LyteLog::tagged_value_topic("Register");
301 let register_count = log_hub.get_number_of_records(bartender_id, register_topic);
302
303 let node_service_context = http::NodeServiceContext::new(config.clone(), network.clone());
304 let mut api = lyquor_lib::http::HttpServer::new(
306 &config.network.api_addr,
307 jsonrpc_client.clone(),
308 fco_handle.clone(),
309 pool_handle.clone(),
310 node_service_context,
311 image_repo_for_http,
312 shutdown.child_token(),
313 http_tls,
314 )
315 .await?;
316 api.start();
317 sys.api = Some(api);
318
319 if register_count == 0 {
321 tracing::info!("Waiting for bartender to be registered.");
322 log_hub
323 .wait_next(bartender_id, register_topic)
324 .await
325 .context("Failed to wait for bartender registration.")?;
326 }
327 let bartender_ins = pool_handle
329 .get_bartender()
330 .await
331 .context("Failed to obtain bartender.")?;
332 let bartender_ln = bartender_ins.instance().max_number().unwrap_or(LyquidNumber::ZERO);
333 tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
334 bartender_ins
335 .wait_until(bartender_ln)
336 .await
337 .context("Failed to wait for the initial re-execution of bartender.")?;
338
339 discovery_handle.bind_log_stream(
341 log_hub
342 .subscribe(bartender_id, register_topic)
343 .await
344 .context("Failed to subscribe to log system for LyquidDiscovery.")?,
345 );
346 discovery_handle
347 .set_pool(pool_handle.clone())
348 .await
349 .context("Failed to set pool in LyquidDiscovery.")?;
350
351 tracing::info!("INIT COMPLETE (bartender={bartender_id})");
352
353 let db = DBFlusher::new(0, store, fco_handle, Duration::from_secs(config.flush_interval_secs)).start();
355 sys.db = Some(db);
356
357 Ok(sys)
358 }
359}
360
361fn generate_unused_port() -> anyhow::Result<u16> {
362 let listener = std::net::TcpListener::bind("0.0.0.0:0").context("Failed to bind to local address.")?;
363 let port = listener.local_addr().context("Failed to get local address.")?.port();
364 Ok(port)
365}
366
367fn anvil_state_path(data_dir: &Path, profile_id: &str) -> PathBuf {
368 data_dir.join(profile_id).join(ANVIL_STATE_FILENAME)
369}
370
371fn start_devnet_anvil(state_file: Option<&Path>, state_interval_secs: u64) -> anyhow::Result<AnvilInstance> {
372 let anvil_port = generate_unused_port()?;
373 use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
376 let mut sigset = SigSet::empty();
377 sigset.add(Signal::SIGINT);
378 let mut old_mask = SigSet::empty();
379 signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
380 .context("Failed to mask SIGINT for anvil.")?;
381
382 let mut anvil_builder = Anvil::new().port(anvil_port).keep_stdout();
383 if let Some(state_file) = state_file {
384 if let Some(state_dir) = state_file.parent() {
385 std::fs::create_dir_all(state_dir)?;
386 }
387 anvil_builder = anvil_builder.arg("--state").arg(state_file.as_os_str());
388 if state_interval_secs > 0 {
389 anvil_builder = anvil_builder
390 .arg("--state-interval")
391 .arg(state_interval_secs.to_string());
392 }
393 }
394 let mut anvil = anvil_builder.try_spawn()?;
395
396 let stdout_reader = std::io::BufReader::new(
397 anvil
398 .child_mut()
399 .stdout
400 .take()
401 .context("Failed to read from anvil stdout.")?,
402 );
403
404 tokio::task::spawn_blocking(move || {
405 use std::io::BufRead;
406 let mut stdout_lines = stdout_reader.lines();
407 while let Some(line) = stdout_lines.next() {
408 tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
409 }
410 });
411
412 tracing::info!("Anvil started at port {}.", anvil_port);
413 signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
414 .context("Failed to restore old signal mask for anvil.")?;
415
416 Ok(anvil)
417}
418
419fn parse_override_value(raw: &str) -> Value {
420 if let Ok(v) = raw.parse::<bool>() {
421 return Value::from(v);
422 }
423 if let Ok(v) = raw.parse::<u64>() {
424 return Value::from(v);
425 }
426 if let Ok(v) = raw.parse::<i64>() {
427 return Value::from(v);
428 }
429 if let Ok(v) = raw.parse::<f64>() {
430 return Value::from(v);
431 }
432 Value::from(raw.to_string())
433}
434
435fn insert_override_path(overrides: &mut Map<String, Value>, key: &str, value: Value) -> anyhow::Result<()> {
436 let mut segments = key.split('.').peekable();
437 let mut current = overrides;
438
439 while let Some(segment) = segments.next() {
440 if segments.peek().is_none() {
441 current.insert(segment.to_string(), value);
442 return Ok(());
443 }
444
445 let entry = current
446 .entry(segment.to_string())
447 .or_insert_with(|| Value::Object(Map::<String, Value>::new()));
448
449 match entry {
450 Value::Object(dict) => {
451 current = dict;
452 }
453 _ => anyhow::bail!("Override path '{key}' conflicts with non-table value."),
454 }
455 }
456
457 Ok(())
458}
459
460fn build_config_overrides(matches: &clap::ArgMatches) -> anyhow::Result<Map<String, Value>> {
461 let mut overrides: Map<String, Value> = Map::new();
462
463 if let Some(entries) = matches.get_many::<String>("config-override") {
464 for entry in entries {
465 let (key, value) = entry
466 .split_once('=')
467 .ok_or_else(|| anyhow::anyhow!("Invalid --config-override '{entry}', expected key=value"))?;
468 insert_override_path(&mut overrides, key, parse_override_value(value))?;
469 }
470 }
471
472 Ok(overrides)
473}
474
475const NODE_RUNTIME_THREAD_NAME: &str = "lyquor-node";
476
477fn build_node_runtime() -> std::io::Result<tokio::runtime::Runtime> {
478 tokio::runtime::Builder::new_multi_thread()
479 .enable_all()
480 .thread_name(NODE_RUNTIME_THREAD_NAME)
481 .build()
482}
483
484fn main() -> anyhow::Result<()> {
485 std::panic::set_hook(Box::new(|_| {
487 if let Some(&pid) = ANVIL_PID.get() {
488 tracing::warn!("Panic detected, killing anvil process {}", pid);
489 use nix::sys::signal::{Signal, kill};
490 let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
491 }
492 }));
493
494 lyquor_cli::setup_tracing()?;
495
496 println!("{}", lyquor_cli::format_logo_banner());
497
498 let runtime = build_node_runtime().context("Failed to build node Tokio runtime")?;
499
500 runtime.block_on(run_node())
501}
502
503async fn run_node() -> anyhow::Result<()> {
504 let matches = clap::command!()
505 .version(lyquor_cli::build_version())
506 .propagate_version(true)
507 .arg(clap::arg!(--config <PATH> "Path to the Lyquor config file (toml, yaml, json).").required(false))
508 .arg(
509 clap::arg!(--"config-override" <KEY_VALUE> "Override config with key=value (repeatable).")
510 .required(false)
511 .action(clap::ArgAction::Append)
512 .value_parser(clap::builder::NonEmptyStringValueParser::new()),
513 )
514 .get_matches();
515
516 let config_path = matches.get_one::<String>("config").map(PathBuf::from);
517 let overrides = build_config_overrides(&matches)?;
518 let config = NodeConfig::load(config_path, overrides)?;
519
520 let mut _anvil: Option<AnvilInstance> = None;
521 let seq_eth_url = match config.profile.base {
522 NetworkType::Devnet => {
523 match config.profile.sequencer.as_deref() {
524 None => {
525 let state_file = match &config.storage.db {
526 DB::RocksDb { .. } => Some(anvil_state_path(
527 &config.resolved_db_dir(),
528 config.profile.base.as_str(),
529 )),
530 DB::MemDb => None,
531 };
532 let anvil = start_devnet_anvil(state_file.as_deref(), config.flush_interval_secs)?;
533 ANVIL_PID.set(anvil.child().id()).ok();
535 let ep = anvil.ws_endpoint();
536 _anvil = Some(anvil);
537 ep
538 }
539 Some(url) => url.to_string(),
540 }
541 }
542 _ => return Err(anyhow::anyhow!("Network is not supported.")),
543 };
544 let profile: Arc<LyquorProfile> = Arc::new(config.profile.resolve(seq_eth_url.clone())?);
545 let node_seed = config.node_key.seed_bytes()?;
546 let (_, tls_config) = lyquor_tls::generator::single_node_config_with_seed(&node_seed, profile.dns_suffix());
547
548 if let (Some(external_host), Some(barstrainer_addr)) = (
549 config.network.external_host.as_deref(),
550 config.network.barstrainer_addr.as_deref(),
551 ) {
552 match SigningIdentity::from_tls_config(&tls_config) {
553 Ok(barstrainer_identity) => {
554 let dns_suffix = profile.dns_suffix().to_string();
555 let external_host = external_host.to_string();
556 let barstrainer_addr = barstrainer_addr.to_string();
557 tokio::spawn(async move {
558 if let Err(err) =
559 publish_external_host_dns(&barstrainer_identity, &dns_suffix, &external_host, &barstrainer_addr)
560 .await
561 {
562 tracing::warn!(
563 error = ?err,
564 external_host = %external_host,
565 barstrainer_addr = %barstrainer_addr,
566 "Failed to publish node DNS record"
567 );
568 }
569 });
570 }
571 Err(err) => {
572 tracing::warn!(
573 error = ?err,
574 "Skipping external DNS publication because signing identity could not be built"
575 );
576 }
577 }
578 } else {
579 if config.network.external_host.is_some() {
580 tracing::warn!("Skipping external DNS publication because network.barstrainer_addr is not configured");
581 }
582 }
583
584 let shutdown = CancellationToken::new();
585
586 let http_tls = if let Some(mut acme_config) = config.network.acme.clone() {
589 acme_config.storage_path = config
590 .resolved_acme_storage_path()
591 .expect("resolved ACME storage path should exist when ACME is configured");
592 let acme = lyquor_lib::acme::Acme::new_from_acme_config(
593 acme_config,
594 config.network.barstrainer_addr.clone().unwrap_or_default(),
595 profile.dns_suffix().to_owned(),
596 &tls_config,
597 );
598
599 let http_tls = if let Ok(acme) = acme {
600 let domain = barstrainer_client::fqdn_for_node(&tls_config.node_id(), profile.dns_suffix(), None);
601 let domains = vec![domain.clone()];
602
603 Ok(http::httptls::HttpTls::new(acme, domains).await)
604 } else {
605 Err(anyhow::anyhow!("Failed to construct ACME config"))
606 };
607
608 match http_tls {
609 Ok(t) => Some(t),
610 Err(err) => {
611 tracing::warn!("Failed to initialize HTTP TLS for api server: {:?}", err);
612 None
613 }
614 }
615 } else {
616 None
617 };
618
619 let config = Arc::new(config);
621 let mut network = lyquor_net::hub::HubBuilder::new(tls_config, shutdown.child_token())
622 .listen_addr(config.network.upc_addr.clone())
623 .build()
624 .context("Failed to setup network.")?;
625 network.start().await.context("Failed to start network.")?;
626 let network = Arc::new(network);
627
628 let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
631 let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
632
633 let mut node = Node {
634 sys: Subsystems::default(),
635 config,
636 profile: profile.clone(),
637 seq_eth_url,
638 http_tls,
639 started_anvil: _anvil.is_some(),
640 network: Some(network.clone()),
641 shutdown,
642 }
643 .start()
644 .await?;
645
646 tokio::select! {
647 _ = sigint.recv() => {
648 tracing::info!("received SIGINT");
649 }
650 _ = sigterm.recv() => {
651 tracing::info!("received SIGTERM");
652 }
653 }
654
655 node.shutdown().await;
656 network.wait_for_shutdown().await;
657 Ok(())
658}